diff options
author | tobo <tobo@yandex-team.com> | 2025-05-30 15:22:41 +0300 |
---|---|---|
committer | tobo <tobo@yandex-team.com> | 2025-05-30 16:06:29 +0300 |
commit | 214c429243173c62ed6b95d778e9af2448cd2007 (patch) | |
tree | c3f48ec4f9180bc0c3ba47e429f03cea2d9a7fe4 /library/cpp | |
parent | 3c5cbc7d5e3754691f6f18de2f68ff3637ff71f7 (diff) | |
download | ydb-214c429243173c62ed6b95d778e9af2448cd2007.tar.gz |
AtomicTryLock() / AtomicUnlock() => TSpinLock
commit_hash:d6f16e427045049e4e5815d09a949cc721a20c79
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 44 |
1 files changed, 17 insertions, 27 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index feee776b872..fdde719e503 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -4,13 +4,13 @@ #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> #include <util/generic/typetraits.h> -#include <util/generic/vector.h> #include <util/generic/ylimits.h> #include <library/cpp/deprecated/atomic/atomic.h> #include <util/system/guard.h> #include <util/system/spinlock.h> #include <util/system/yassert.h> +#include <atomic> #include <type_traits> #include <utility> @@ -240,7 +240,7 @@ namespace NThreading { }; struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> { - TAtomic WriteLock = 0; + TSpinLock WriteLock; using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite; using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite; @@ -250,11 +250,7 @@ namespace NThreading { }; private: - union { - TAtomic WriteTag = 0; - char Pad[PLATFORM_CACHE_LINE]; - }; - + TPadded<std::atomic<ui64>> WriteTag{1}; TQueueType Queues[Concurrency]; public: @@ -291,20 +287,20 @@ namespace NThreading { ui64 NextTag() { // TODO: can we avoid synchronization here? it costs 1.5x performance penalty // return GetCycleCount(); - return AtomicIncrement(WriteTag); + return WriteTag.fetch_add(1); } template <typename TT> bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { TEntry* entry = queue.PrepareWrite(); Y_ASSERT(entry); TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); entry->Tag = tag; queue.CompleteWrite(); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } @@ -383,7 +379,7 @@ namespace NThreading { template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyOneQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { - TAtomic WriteLock = 0; + TSpinLock WriteLock; }; private: @@ -429,9 +425,9 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } @@ -446,14 +442,8 @@ namespace NThreading { template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> class TRelaxedManyManyQueue: private TNonCopyable { struct TQueueType: public TOneOneQueue<T, ChunkSize> { - union { - TAtomic WriteLock = 0; - char Pad1[PLATFORM_CACHE_LINE]; - }; - union { - TAtomic ReadLock = 0; - char Pad2[PLATFORM_CACHE_LINE]; - }; + TPadded<TSpinLock> WriteLock; + TPadded<TSpinLock> ReadLock; }; private: @@ -473,9 +463,9 @@ namespace NThreading { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[readPos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { + if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { bool dequeued = queue.Dequeue(value); - AtomicUnlock(&queue.ReadLock); + queue.ReadLock.Release(); if (dequeued) { return true; } @@ -487,9 +477,9 @@ namespace NThreading { bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (AtomicTryAndTryLock(&queue.ReadLock)) { + if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { bool empty = queue.IsEmpty(); - AtomicUnlock(&queue.ReadLock); + queue.ReadLock.Release(); if (!empty) { return false; } @@ -504,9 +494,9 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (AtomicTryAndTryLock(&queue.WriteLock)) { + if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { queue.Enqueue(std::forward<TT>(value)); - AtomicUnlock(&queue.WriteLock); + queue.WriteLock.Release(); return true; } } |