From 214c429243173c62ed6b95d778e9af2448cd2007 Mon Sep 17 00:00:00 2001 From: tobo Date: Fri, 30 May 2025 15:22:41 +0300 Subject: AtomicTryLock() / AtomicUnlock() => TSpinLock commit_hash:d6f16e427045049e4e5815d09a949cc721a20c79 --- library/cpp/threading/chunk_queue/queue.h | 44 ++++++++++++------------------- 1 file changed, 17 insertions(+), 27 deletions(-) (limited to 'library/cpp/threading/chunk_queue/queue.h') diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index feee776b872..fdde719e503 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -4,13 +4,13 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include @@ -240,7 +240,7 @@ namespace NThreading { }; struct TQueueType: public TOneOneQueue { - TAtomic WriteLock = 0; + TSpinLock WriteLock; using TOneOneQueue::PrepareWrite; using TOneOneQueue::CompleteWrite; @@ -250,11 +250,7 @@ namespace NThreading { }; private: - union { - TAtomic WriteTag = 0; - char Pad[PLATFORM_CACHE_LINE]; - }; - + TPadded> WriteTag{1}; TQueueType Queues[Concurrency]; public: @@ -291,20 +287,20 @@ namespace NThreading { ui64 NextTag() { // TODO: can we avoid synchronization here? it costs 1.5x performance penalty // return GetCycleCount(); - return AtomicIncrement(WriteTag); + return WriteTag.fetch_add(1); } template bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { TEntry* entry = queue.PrepareWrite(); Y_ASSERT(entry); TTypeHelper::Write(&entry->Value, std::forward(value)); entry->Tag = tag; queue.CompleteWrite(); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } @@ -383,7 +379,7 @@ namespace NThreading { template class TRelaxedManyOneQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue { - TAtomic WriteLock = 0; + TSpinLock WriteLock; }; private: @@ -429,9 +425,9 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { queue.Enqueue(std::forward(value)); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } @@ -446,14 +442,8 @@ namespace NThreading { template class TRelaxedManyManyQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue { - union { - TAtomic WriteLock = 0; - char Pad1[PLATFORM_CACHE_LINE]; - }; - union { - TAtomic ReadLock = 0; - char Pad2[PLATFORM_CACHE_LINE]; - }; + TPadded WriteLock; + TPadded ReadLock; }; private: @@ -473,9 +463,9 @@ namespace NThreading { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[readPos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { + if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { bool dequeued = queue.Dequeue(value); - AtomicUnlock(&queue.ReadLock); + queue.ReadLock.Release(); if (dequeued) { return true; } @@ -487,9 +477,9 @@ namespace NThreading { bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { + if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { bool empty = queue.IsEmpty(); - AtomicUnlock(&queue.ReadLock); + queue.ReadLock.Release(); if (!empty) { return false; } @@ -504,9 +494,9 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { queue.Enqueue(std::forward(value)); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } -- cgit v1.3