diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/threading/chunk_queue/queue.h | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/chunk_queue/queue.h')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 958 |
1 files changed, 479 insertions, 479 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index 55859601a1..1959b0258c 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -19,21 +19,21 @@ namespace NThreading { // Platform helpers #if !defined(PLATFORM_CACHE_LINE) -#define PLATFORM_CACHE_LINE 64 +#define PLATFORM_CACHE_LINE 64 #endif #if !defined(PLATFORM_PAGE_SIZE) -#define PLATFORM_PAGE_SIZE 4 * 1024 +#define PLATFORM_PAGE_SIZE 4 * 1024 #endif - template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> - struct TPadded: public T { - char Pad[PadSize - sizeof(T) % PadSize]; + template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> + struct TPadded: public T { + char Pad[PadSize - sizeof(T) % PadSize]; - TPadded() { - static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); - Y_UNUSED(Pad); - } + TPadded() { + static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); + Y_UNUSED(Pad); + } template<typename... Args> TPadded(Args&&... args) @@ -42,280 +42,280 @@ namespace NThreading { static_assert(sizeof(*this) % PadSize == 0, "padding does not work"); Y_UNUSED(Pad); } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Type helpers - - namespace NImpl { - template <typename T> - struct TPodTypeHelper { - template <typename TT> - static void Write(T* ptr, TT&& value) { - *ptr = value; - } - - static T Read(T* ptr) { - return *ptr; - } - - static void Destroy(T* ptr) { - Y_UNUSED(ptr); - } - }; - - template <typename T> - struct TNonPodTypeHelper { - template <typename TT> - static void Write(T* ptr, TT&& value) { - new (ptr) T(std::forward<TT>(value)); - } - - static T Read(T* ptr) { - return std::move(*ptr); - } - - static void Destroy(T* ptr) { - (void)ptr; /* Make MSVC happy. */ - ptr->~T(); - } - }; - - template <typename T> - using TTypeHelper = std::conditional_t< - TTypeTraits<T>::IsPod, - TPodTypeHelper<T>, - TNonPodTypeHelper<T>>; - - } - - //////////////////////////////////////////////////////////////////////////////// - // One producer/one consumer chunked queue. - - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TOneOneQueue: private TNonCopyable { - using TTypeHelper = NImpl::TTypeHelper<T>; - - struct TChunk; - - struct TChunkHeader { - size_t Count = 0; - TChunk* Next = nullptr; - }; - - struct TChunk: public TChunkHeader { - static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); - - char Entries[MaxCount * sizeof(T)]; - - TChunk() { - Y_UNUSED(Entries); // uninitialized - } - - ~TChunk() { - for (size_t i = 0; i < this->Count; ++i) { - TTypeHelper::Destroy(GetPtr(i)); - } - } - - T* GetPtr(size_t i) { - return (T*)Entries + i; - } - }; - - struct TWriterState { - TChunk* Chunk = nullptr; - }; - - struct TReaderState { - TChunk* Chunk = nullptr; - size_t Count = 0; - }; - - private: - TPadded<TWriterState> Writer; - TPadded<TReaderState> Reader; - - public: - using TItem = T; - - TOneOneQueue() { - Writer.Chunk = Reader.Chunk = new TChunk(); - } - - ~TOneOneQueue() { - DeleteChunks(Reader.Chunk); - } - - template <typename TT> - void Enqueue(TT&& value) { - T* ptr = PrepareWrite(); - Y_ASSERT(ptr); - TTypeHelper::Write(ptr, std::forward<TT>(value)); - CompleteWrite(); - } - - bool Dequeue(T& value) { - if (T* ptr = PrepareRead()) { - value = TTypeHelper::Read(ptr); - CompleteRead(); - return true; - } - return false; - } - - bool IsEmpty() { - return !PrepareRead(); - } - - protected: - T* PrepareWrite() { - TChunk* chunk = Writer.Chunk; - Y_ASSERT(chunk && !chunk->Next); - - if (chunk->Count != TChunk::MaxCount) { - return chunk->GetPtr(chunk->Count); - } - - chunk = new TChunk(); - AtomicSet(Writer.Chunk->Next, chunk); - Writer.Chunk = chunk; - return chunk->GetPtr(0); - } - - void CompleteWrite() { - AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); - } - - T* PrepareRead() { - TChunk* chunk = Reader.Chunk; - Y_ASSERT(chunk); - - for (;;) { - size_t writerCount = AtomicGet(chunk->Count); - if (Reader.Count != writerCount) { - return chunk->GetPtr(Reader.Count); - } - - if (writerCount != TChunk::MaxCount) { - return nullptr; - } - - chunk = AtomicGet(chunk->Next); - if (!chunk) { - return nullptr; - } - - delete Reader.Chunk; - Reader.Chunk = chunk; - Reader.Count = 0; - } - } + }; - void CompleteRead() { - ++Reader.Count; - } + //////////////////////////////////////////////////////////////////////////////// + // Type helpers - private: - static void DeleteChunks(TChunk* chunk) { - while (chunk) { - TChunk* next = chunk->Next; - delete chunk; - chunk = next; - } - } - }; + namespace NImpl { + template <typename T> + struct TPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + *ptr = value; + } - //////////////////////////////////////////////////////////////////////////////// - // Multiple producers/single consumer partitioned queue. - // Provides FIFO guaranties for each producer. + static T Read(T* ptr) { + return *ptr; + } + + static void Destroy(T* ptr) { + Y_UNUSED(ptr); + } + }; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TManyOneQueue: private TNonCopyable { - using TTypeHelper = NImpl::TTypeHelper<T>; + template <typename T> + struct TNonPodTypeHelper { + template <typename TT> + static void Write(T* ptr, TT&& value) { + new (ptr) T(std::forward<TT>(value)); + } + + static T Read(T* ptr) { + return std::move(*ptr); + } + + static void Destroy(T* ptr) { + (void)ptr; /* Make MSVC happy. */ + ptr->~T(); + } + }; + + template <typename T> + using TTypeHelper = std::conditional_t< + TTypeTraits<T>::IsPod, + TPodTypeHelper<T>, + TNonPodTypeHelper<T>>; + + } + + //////////////////////////////////////////////////////////////////////////////// + // One producer/one consumer chunked queue. + + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TOneOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TChunk; + + struct TChunkHeader { + size_t Count = 0; + TChunk* Next = nullptr; + }; + + struct TChunk: public TChunkHeader { + static constexpr size_t MaxCount = (ChunkSize - sizeof(TChunkHeader)) / sizeof(T); + + char Entries[MaxCount * sizeof(T)]; + + TChunk() { + Y_UNUSED(Entries); // uninitialized + } + + ~TChunk() { + for (size_t i = 0; i < this->Count; ++i) { + TTypeHelper::Destroy(GetPtr(i)); + } + } + + T* GetPtr(size_t i) { + return (T*)Entries + i; + } + }; + + struct TWriterState { + TChunk* Chunk = nullptr; + }; + + struct TReaderState { + TChunk* Chunk = nullptr; + size_t Count = 0; + }; + + private: + TPadded<TWriterState> Writer; + TPadded<TReaderState> Reader; + + public: + using TItem = T; + + TOneOneQueue() { + Writer.Chunk = Reader.Chunk = new TChunk(); + } + + ~TOneOneQueue() { + DeleteChunks(Reader.Chunk); + } + + template <typename TT> + void Enqueue(TT&& value) { + T* ptr = PrepareWrite(); + Y_ASSERT(ptr); + TTypeHelper::Write(ptr, std::forward<TT>(value)); + CompleteWrite(); + } + + bool Dequeue(T& value) { + if (T* ptr = PrepareRead()) { + value = TTypeHelper::Read(ptr); + CompleteRead(); + return true; + } + return false; + } + + bool IsEmpty() { + return !PrepareRead(); + } - struct TEntry { - T Value; - ui64 Tag; - }; - - struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { - TAtomic WriteLock = 0; + protected: + T* PrepareWrite() { + TChunk* chunk = Writer.Chunk; + Y_ASSERT(chunk && !chunk->Next); + + if (chunk->Count != TChunk::MaxCount) { + return chunk->GetPtr(chunk->Count); + } + + chunk = new TChunk(); + AtomicSet(Writer.Chunk->Next, chunk); + Writer.Chunk = chunk; + return chunk->GetPtr(0); + } + + void CompleteWrite() { + AtomicSet(Writer.Chunk->Count, Writer.Chunk->Count + 1); + } - using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; - using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; - - using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; - using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; - }; + T* PrepareRead() { + TChunk* chunk = Reader.Chunk; + Y_ASSERT(chunk); - private: - union { - TAtomic WriteTag = 0; - char Pad[PLATFORM_CACHE_LINE]; - }; - - TQueueType Queues[Concurrency]; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - ui64 tag = NextTag(); - while (!TryEnqueue(std::forward<TT>(value), tag)) { - SpinLockPause(); - } - } - - bool Dequeue(T& value) { - size_t index = 0; - if (TEntry* entry = PrepareRead(index)) { - value = TTypeHelper::Read(&entry->Value); - Queues[index].CompleteRead(); - return true; - } - return false; - } + for (;;) { + size_t writerCount = AtomicGet(chunk->Count); + if (Reader.Count != writerCount) { + return chunk->GetPtr(Reader.Count); + } + + if (writerCount != TChunk::MaxCount) { + return nullptr; + } + + chunk = AtomicGet(chunk->Next); + if (!chunk) { + return nullptr; + } + + delete Reader.Chunk; + Reader.Chunk = chunk; + Reader.Count = 0; + } + } + + void CompleteRead() { + ++Reader.Count; + } + + private: + static void DeleteChunks(TChunk* chunk) { + while (chunk) { + TChunk* next = chunk->Next; + delete chunk; + chunk = next; + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Provides FIFO guaranties for each producer. + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TManyOneQueue: private TNonCopyable { + using TTypeHelper = NImpl::TTypeHelper<T>; + + struct TEntry { + T Value; + ui64 Tag; + }; + + struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { + TAtomic WriteLock = 0; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; + using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; + + using TOneOneQueue<TEntry, ChunkSize>::PrepareRead; + using TOneOneQueue<TEntry, ChunkSize>::CompleteRead; + }; + + private: + union { + TAtomic WriteTag = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + ui64 tag = NextTag(); + while (!TryEnqueue(std::forward<TT>(value), tag)) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t index = 0; + if (TEntry* entry = PrepareRead(index)) { + value = TTypeHelper::Read(&entry->Value); + Queues[index].CompleteRead(); + return true; + } + return false; + } - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - if (!Queues[i].IsEmpty()) { - return false; - } - } - return true; + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; } - private: - ui64 NextTag() { - // TODO: can we avoid synchronization here? it costs 1.5x performance penalty - // return GetCycleCount(); - return AtomicIncrement(WriteTag); - } + private: + ui64 NextTag() { + // TODO: can we avoid synchronization here? it costs 1.5x performance penalty + // return GetCycleCount(); + return AtomicIncrement(WriteTag); + } - template <typename TT> - bool TryEnqueue(TT&& value, ui64 tag) { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - TEntry* entry = queue.PrepareWrite(); - Y_ASSERT(entry); - TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); - entry->Tag = tag; - queue.CompleteWrite(); - AtomicUnlock(&queue.WriteLock); - return true; - } + template <typename TT> + bool TryEnqueue(TT&& value, ui64 tag) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + AtomicUnlock(&queue.WriteLock); + return true; + } } - return false; + return false; } - TEntry* PrepareRead(size_t& index) { - TEntry* entry = nullptr; - ui64 tag = Max(); + TEntry* PrepareRead(size_t& index) { + TEntry* entry = nullptr; + ui64 tag = Max(); - for (size_t i = 0; i < Concurrency; ++i) { + for (size_t i = 0; i < Concurrency; ++i) { TEntry* e = Queues[i].PrepareRead(); if (e && e->Tag < tag) { index = i; @@ -323,246 +323,246 @@ namespace NThreading { tag = e->Tag; } } - - if (entry) { - // need second pass to catch updates within already scanned range - size_t candidate = index; - for (size_t i = 0; i < candidate; ++i) { - TEntry* e = Queues[i].PrepareRead(); - if (e && e->Tag < tag) { - index = i; - entry = e; - tag = e->Tag; - } - } - } - - return entry; + + if (entry) { + // need second pass to catch updates within already scanned range + size_t candidate = index; + for (size_t i = 0; i < candidate; ++i) { + TEntry* e = Queues[i].PrepareRead(); + if (e && e->Tag < tag) { + index = i; + entry = e; + tag = e->Tag; + } + } + } + + return entry; } - }; + }; - //////////////////////////////////////////////////////////////////////////////// - // Concurrent many-many queue with strong FIFO guaranties. - // Writers will not block readers (and vice versa), but will block each other. + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many queue with strong FIFO guaranties. + // Writers will not block readers (and vice versa), but will block each other. - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> - class TManyManyQueue: private TNonCopyable { - private: - TPadded<TLock> WriteLock; - TPadded<TLock> ReadLock; - - TOneOneQueue<T, ChunkSize> Queue; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - with_lock (WriteLock) { - Queue.Enqueue(std::forward<TT>(value)); - } + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + class TManyManyQueue: private TNonCopyable { + private: + TPadded<TLock> WriteLock; + TPadded<TLock> ReadLock; + + TOneOneQueue<T, ChunkSize> Queue; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + with_lock (WriteLock) { + Queue.Enqueue(std::forward<TT>(value)); + } + } + + bool Dequeue(T& value) { + with_lock (ReadLock) { + return Queue.Dequeue(value); + } + } + + bool IsEmpty() { + with_lock (ReadLock) { + return Queue.IsEmpty(); + } + } + }; + + //////////////////////////////////////////////////////////////////////////////// + // Multiple producers/single consumer partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyOneQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + TAtomic WriteLock = 0; + }; + + private: + union { + size_t ReadPos = 0; + char Pad[PLATFORM_CACHE_LINE]; + }; + + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[ReadPos++ % Concurrency]; + if (queue.Dequeue(value)) { + return true; + } + } + return false; + } + + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + if (!Queues[i].IsEmpty()) { + return false; + } + } + return true; + } + + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } + } + return false; } + }; - bool Dequeue(T& value) { - with_lock (ReadLock) { - return Queue.Dequeue(value); - } - } - - bool IsEmpty() { - with_lock (ReadLock) { - return Queue.IsEmpty(); - } - } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Multiple producers/single consumer partitioned queue. - // Because of random partitioning reordering possible - FIFO not guaranteed! - - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TRelaxedManyOneQueue: private TNonCopyable { - struct TQueueType: public TOneOneQueue<T, ChunkSize> { - TAtomic WriteLock = 0; - }; - - private: - union { - size_t ReadPos = 0; - char Pad[PLATFORM_CACHE_LINE]; + //////////////////////////////////////////////////////////////////////////////// + // Concurrent many-many partitioned queue. + // Because of random partitioning reordering possible - FIFO not guaranteed! + + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + class TRelaxedManyManyQueue: private TNonCopyable { + struct TQueueType: public TOneOneQueue<T, ChunkSize> { + union { + TAtomic WriteLock = 0; + char Pad1[PLATFORM_CACHE_LINE]; + }; + union { + TAtomic ReadLock = 0; + char Pad2[PLATFORM_CACHE_LINE]; + }; }; - TQueueType Queues[Concurrency]; - - public: - using TItem = T; - - template <typename TT> - void Enqueue(TT&& value) { - while (!TryEnqueue(std::forward<TT>(value))) { - SpinLockPause(); - } - } - - bool Dequeue(T& value) { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[ReadPos++ % Concurrency]; - if (queue.Dequeue(value)) { - return true; + private: + TQueueType Queues[Concurrency]; + + public: + using TItem = T; + + template <typename TT> + void Enqueue(TT&& value) { + while (!TryEnqueue(std::forward<TT>(value))) { + SpinLockPause(); + } + } + + bool Dequeue(T& value) { + size_t readPos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[readPos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool dequeued = queue.Dequeue(value); + AtomicUnlock(&queue.ReadLock); + if (dequeued) { + return true; + } } } - return false; + return false; } - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - if (!Queues[i].IsEmpty()) { - return false; + bool IsEmpty() { + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[i]; + if (AtomicTryAndTryLock(&queue.ReadLock)) { + bool empty = queue.IsEmpty(); + AtomicUnlock(&queue.ReadLock); + if (!empty) { + return false; + } } } - return true; + return true; } - private: - template <typename TT> - bool TryEnqueue(TT&& value) { - size_t writePos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); - return true; - } + private: + template <typename TT> + bool TryEnqueue(TT&& value) { + size_t writePos = GetCycleCount(); + for (size_t i = 0; i < Concurrency; ++i) { + TQueueType& queue = Queues[writePos++ % Concurrency]; + if (AtomicTryAndTryLock(&queue.WriteLock)) { + queue.Enqueue(std::forward<TT>(value)); + AtomicUnlock(&queue.WriteLock); + return true; + } } - return false; + return false; } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Concurrent many-many partitioned queue. - // Because of random partitioning reordering possible - FIFO not guaranteed! - - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - class TRelaxedManyManyQueue: private TNonCopyable { - struct TQueueType: public TOneOneQueue<T, ChunkSize> { - union { - TAtomic WriteLock = 0; - char Pad1[PLATFORM_CACHE_LINE]; - }; - union { - TAtomic ReadLock = 0; - char Pad2[PLATFORM_CACHE_LINE]; - }; - }; + }; - private: - TQueueType Queues[Concurrency]; + //////////////////////////////////////////////////////////////////////////////// + // Simple wrapper to deal with AutoPtrs - public: - using TItem = T; + template <typename T, typename TImpl> + class TAutoQueueBase: private TNonCopyable { + private: + TImpl Impl; - template <typename TT> - void Enqueue(TT&& value) { - while (!TryEnqueue(std::forward<TT>(value))) { - SpinLockPause(); - } - } + public: + using TItem = TAutoPtr<T>; - bool Dequeue(T& value) { - size_t readPos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[readPos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { - bool dequeued = queue.Dequeue(value); - AtomicUnlock(&queue.ReadLock); - if (dequeued) { - return true; - } - } - } - return false; + ~TAutoQueueBase() { + TAutoPtr<T> value; + while (Dequeue(value)) { + // do nothing + } } - bool IsEmpty() { - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { - bool empty = queue.IsEmpty(); - AtomicUnlock(&queue.ReadLock); - if (!empty) { - return false; - } - } - } - return true; - } - - private: - template <typename TT> - bool TryEnqueue(TT&& value) { - size_t writePos = GetCycleCount(); - for (size_t i = 0; i < Concurrency; ++i) { - TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { - queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); - return true; - } - } - return false; - } - }; - - //////////////////////////////////////////////////////////////////////////////// - // Simple wrapper to deal with AutoPtrs - - template <typename T, typename TImpl> - class TAutoQueueBase: private TNonCopyable { - private: - TImpl Impl; - - public: - using TItem = TAutoPtr<T>; - - ~TAutoQueueBase() { - TAutoPtr<T> value; - while (Dequeue(value)) { - // do nothing - } - } - - void Enqueue(TAutoPtr<T> value) { - Impl.Enqueue(value.Get()); + void Enqueue(TAutoPtr<T> value) { + Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); - } + } - bool Dequeue(TAutoPtr<T>& value) { - T* ptr = nullptr; - if (Impl.Dequeue(ptr)) { - value.Reset(ptr); - return true; - } - return false; + bool Dequeue(TAutoPtr<T>& value) { + T* ptr = nullptr; + if (Impl.Dequeue(ptr)) { + value.Reset(ptr); + return true; + } + return false; } - bool IsEmpty() { - return Impl.IsEmpty(); - } - }; + bool IsEmpty() { + return Impl.IsEmpty(); + } + }; - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoOneOneQueue = TAutoQueueBase<T, TOneOneQueue<T*, ChunkSize>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoManyOneQueue = TAutoQueueBase<T, TManyOneQueue<T*, Concurrency, ChunkSize>>; - template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> - using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; + template <typename T, size_t ChunkSize = PLATFORM_PAGE_SIZE, typename TLock = TAdaptiveLock> + using TAutoManyManyQueue = TAutoQueueBase<T, TManyManyQueue<T*, ChunkSize, TLock>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyOneQueue = TAutoQueueBase<T, TRelaxedManyOneQueue<T*, Concurrency, ChunkSize>>; - template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> - using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; -} + template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> + using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; +} |