diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/chunk_queue/queue.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/chunk_queue/queue.h')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 568 |
1 files changed, 568 insertions, 0 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h new file mode 100644 index 0000000000..55859601a1 --- /dev/null +++ b/library/cpp/threading/chunk_queue/queue.h @@ -0,0 +1,568 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/typetraits.h> +#include <util/generic/vector.h> +#include <util/generic/ylimits.h> +#include <util/system/atomic.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> +#include <util/system/yassert.h> + +#include <type_traits> +#include <utility> + +namespace NThreading { +//////////////////////////////////////////////////////////////////////////////// +// Platform helpers + +#if !defined(PLATFORM_CACHE_LINE) +#define PLATFORM_CACHE_LINE 64 +#endif + +#if !defined(PLATFORM_PAGE_SIZE) +#define PLATFORM_PAGE_SIZE 4 * 1024 +#endif + + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> + struct TPadded: public T { + char Pad[PadSize - sizeof(T) % PadSize]; + + TPadded() { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } + + template<typename... Args> + TPadded(Args&&... args) + : T(std::forward<Args>(args)...) + { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Type helpers + + namespace NImpl { + template <typename T> + struct TPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + *ptr = value; + } + + static T Read(T* ptr) { + return *ptr; + } + + static void Destroy(T* ptr) { + Y_UNUSED(ptr); + } + }; + + template <typename T> + struct TNonPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + new (ptr) T(std::forward<TT>(value)); + } + + static T Read(T* ptr) { + return std::move(*ptr); + } + + static void Destroy(T* ptr) { + (void)ptr; /* Make MSVC happy. */ + ptr->~T(); + } + }; + + template <typename T> + using TTypeHelper = std::conditional_t< + TTypeTraits<T>::IsPod, + TPodTypeHelper<T>, + TNonPodTypeHelper<T>>; + + } + + //////////////////////////////////////////////////////////////////////////////// + // One producer/one consumer chunked queue. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TOneOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TChunk; + + struct TChunkHeader { + size_t Count = 0; + TChunk* Next = nullptr; + }; + + struct TChunk: public TChunkHeader { + static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); + + char Entries[MaxCount * sizeof(T)]; + + TChunk() { + Y_UNUSED(Entries); // uninitialized + } + + ~TChunk() { + for (size_t i = 0; i < this->Count; ++i) { + TTypeHelper::Destroy(GetPtr(i)); + } + } + + T* GetPtr(size_t i) { + return (T*)Entries + i; + } + }; + + struct TWriterState { + TChunk* Chunk = nullptr; + }; + + struct TReaderState { + TChunk* Chunk = nullptr; + size_t Count = 0; + }; + + private: + TPadded<TWriterState> Writer; + TPadded<TReaderState> Reader; + + public: + using TItem = T; + + TOneOneQueue() { + Writer.Chunk = Reader.Chunk = new TChunk(); + } + + ~TOneOneQueue() { + DeleteChunks(Reader.Chunk); + } + + template <typename TT> + void Enqueue(TT&& value) { + T* ptr = PrepareWrite(); + Y_ASSERT(ptr); + TTypeHelper::Write(ptr, std::forward<TT>(value)); + CompleteWrite(); + } + + bool Dequeue(T& value) { + if (T* ptr = PrepareRead()) { + value = TTypeHelper::Read(ptr); + CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + return !PrepareRead(); + } + + protected: + T* PrepareWrite() { + TChunk* chunk = Writer.Chunk; + Y_ASSERT(chunk && !chunk->Next); + + if (chunk->Count != TChunk::MaxCount) { + return chunk->GetPtr(chunk->Count); + } + + chunk = new TChunk(); + AtomicSet(Writer.Chunk->Next, chunk); + Writer.Chunk = chunk; + return chunk->GetPtr(0); + } + + void CompleteWrite() { + AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); + } + + T* PrepareRead() { + TChunk* chunk = Reader.Chunk; + Y_ASSERT(chunk); + + for (;;) { + size_t writerCount = AtomicGet(chunk->Count); + if (Reader.Count != writerCount) { + return chunk->GetPtr(Reader.Count); + } + + if (writerCount != TChunk::MaxCount) { + return nullptr; + } + + chunk = AtomicGet(chunk->Next); + if (!chunk) { + return nullptr; + } + + delete Reader.Chunk; + Reader.Chunk = chunk; + Reader.Count = 0; + } + } + + void CompleteRead() { + ++Reader.Count; + } + + private: + static void DeleteChunks(TChunk* chunk) { + while (chunk) { + TChunk* next = chunk->Next; + delete chunk; + chunk = next; + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Provides FIFO guaranties for each producer. + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TManyOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TEntry { + T Value; + ui64 Tag; + }; + + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { + TAtomic WriteLock = 0; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; + using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; + using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; + }; + + private: + union { + TAtomic WriteTag = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + ui64 tag = NextTag(); + while (!TryEnqueue(std::forward<TT>(value), tag)) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t index = 0; + if (TEntry* entry = PrepareRead(index)) { + value = TTypeHelper::Read(&entry->Value); + Queues[index].CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; + } + + private: + ui64 NextTag() { + // TODO: can we avoid synchronization here? it costs 1.5x performance penalty + // return GetCycleCount(); + return AtomicIncrement(WriteTag); + } + + template <typename TT> + bool TryEnqueue(TT&& value, ui64 tag) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + + TEntry* PrepareRead(size_t& index) { + TEntry* entry = nullptr; + ui64 tag = Max(); + + for (size_t i = 0; i < Concurrency; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + + if (entry) { + // need second pass to catch updates within already scanned range + size_t candidate = index; + for (size_t i = 0; i < candidate; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + } + + return entry; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many queue with strong FIFO guaranties. + // Writers will not block readers (and vice versa), but will block each other. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + class TManyManyQueue: private TNonCopyable { + private: + TPadded<TLock> WriteLock; + TPadded<TLock> ReadLock; + + TOneOneQueue<T, ChunkSize> Queue; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + with_lock (WriteLock) { + Queue.Enqueue(std::forward<TT>(value)); + } + } + + bool Dequeue(T& value) { + with_lock (ReadLock) { + return Queue.Dequeue(value); + } + } + + bool IsEmpty() { + with_lock (ReadLock) { + return Queue.IsEmpty(); + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyOneQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + TAtomic WriteLock = 0; + }; + + private: + union { + size_t ReadPos = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[ReadPos++ % Concurrency]; + if (queue.Dequeue(value)) { + return true; + } + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyManyQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + union { + TAtomic WriteLock = 0; + char Pad1[PLATFORM_CACHE_LINE]; + }; + union { + TAtomic ReadLock = 0; + char Pad2[PLATFORM_CACHE_LINE]; + }; + }; + + private: + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t readPos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[readPos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool dequeued = queue.Dequeue(value); + AtomicUnlock(&queue.ReadLock); + if (dequeued) { + return true; + } + } + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool empty = queue.IsEmpty(); + AtomicUnlock(&queue.ReadLock); + if (!empty) { + return false; + } + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Simple wrapper to deal with AutoPtrs + + template <typename T, typename TImpl> + class TAutoQueueBase: private TNonCopyable { + private: + TImpl Impl; + + public: + using TItem = TAutoPtr<T>; + + ~TAutoQueueBase() { + TAutoPtr<T> value; + while (Dequeue(value)) { + // do nothing + } + } + + void Enqueue(TAutoPtr<T> value) { + Impl.Enqueue(value.Get()); + Y_UNUSED(value.Release()); + } + + bool Dequeue(TAutoPtr<T>& value) { + T* ptr = nullptr; + if (Impl.Dequeue(ptr)) { + value.Reset(ptr); + return true; + } + return false; + } + + bool IsEmpty() { + return Impl.IsEmpty(); + } + }; + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; +} |