diff options
author | nae202 <nae202@yandex-team.com> | 2025-06-04 00:46:32 +0300 |
---|---|---|
committer | nae202 <nae202@yandex-team.com> | 2025-06-04 01:00:07 +0300 |
commit | 4e19d625f6ee3bfd94071094f06eefc0546a6d03 (patch) | |
tree | da61a31765409ca7f5ebe432d0391d25405b1027 /library/cpp | |
parent | 5a0c4130701ca71b820a48ccb151b3bde804bda0 (diff) | |
download | ydb-4e19d625f6ee3bfd94071094f06eefc0546a6d03.tar.gz |
TTryGuard for queue lock
commit_hash:027b7f086dcb0c574896ea5bd2c4958bf82914cf
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/threading/chunk_queue/queue.h | 97 |
1 files changed, 58 insertions, 39 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h index 17ebef91a86..7cb4be4b4ef 100644 --- a/library/cpp/threading/chunk_queue/queue.h +++ b/library/cpp/threading/chunk_queue/queue.h @@ -1,11 +1,12 @@ #pragma once +#include <library/cpp/deprecated/atomic/atomic.h> // AtomicGet + #include <util/datetime/base.h> #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> #include <util/generic/typetraits.h> #include <util/generic/ylimits.h> -#include <library/cpp/deprecated/atomic/atomic.h> #include <util/system/guard.h> #include <util/system/spinlock.h> #include <util/system/yassert.h> @@ -13,15 +14,15 @@ #include <atomic> namespace NThreading { -//////////////////////////////////////////////////////////////////////////////// -// Platform helpers + //////////////////////////////////////////////////////////////////////////////// + // Platform helpers #if !defined(PLATFORM_CACHE_LINE) -#define PLATFORM_CACHE_LINE 64 + #define PLATFORM_CACHE_LINE 64 #endif #if !defined(PLATFORM_PAGE_SIZE) -#define PLATFORM_PAGE_SIZE 4 * 1024 + #define PLATFORM_PAGE_SIZE 4 * 1024 #endif template <typename T, size_t PadSize = PLATFORM_CACHE_LINE> @@ -33,7 +34,7 @@ namespace NThreading { Y_UNUSED(Pad); } - template<typename... Args> + template <typename... Args> TPadded(Args&&... args) : T(std::forward<Args>(args)...) { @@ -85,7 +86,7 @@ namespace NThreading { TPodTypeHelper<T>, TNonPodTypeHelper<T>>; - } + } // namespace NImpl //////////////////////////////////////////////////////////////////////////////// // One producer/one consumer chunked queue. @@ -292,15 +293,19 @@ namespace NThreading { bool TryEnqueue(TT&& value, ui64 tag) { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - TEntry* entry = queue.PrepareWrite(); - Y_ASSERT(entry); - TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); - entry->Tag = tag; - queue.CompleteWrite(); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; + } + TEntry* entry = queue.PrepareWrite(); + Y_ASSERT(entry); + TTypeHelper::Write(&entry->Value, std::forward<TT>(value)); + entry->Tag = tag; + queue.CompleteWrite(); + return true; } return false; } @@ -423,11 +428,15 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - queue.Enqueue(std::forward<TT>(value)); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; } + queue.Enqueue(std::forward<TT>(value)); + return true; } return false; } @@ -461,12 +470,15 @@ namespace NThreading { size_t readPos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[readPos++ % Concurrency]; - if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { - bool dequeued = queue.Dequeue(value); - queue.ReadLock.Release(); - if (dequeued) { - return true; - } + if (queue.ReadLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.ReadLock}; + if (!guard) { + continue; + } + if (queue.Dequeue(value)) { + return true; } } return false; @@ -475,12 +487,15 @@ namespace NThreading { bool IsEmpty() { for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[i]; - if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) { - bool empty = queue.IsEmpty(); - queue.ReadLock.Release(); - if (!empty) { - return false; - } + if (queue.ReadLock.IsLocked()) { + continue; + } + TTryGuard guard{queue.ReadLock}; + if (!guard) { + continue; + } + if (!queue.IsEmpty()) { + return false; } } return true; @@ -492,11 +507,15 @@ namespace NThreading { size_t writePos = GetCycleCount(); for (size_t i = 0; i < Concurrency; ++i) { TQueueType& queue = Queues[writePos++ % Concurrency]; - if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) { - queue.Enqueue(std::forward<TT>(value)); - queue.WriteLock.Release(); - return true; + if (queue.WriteLock.IsLocked()) { + continue; } + TTryGuard guard{queue.WriteLock}; + if (!guard) { + continue; + } + queue.Enqueue(std::forward<TT>(value)); + return true; } return false; } @@ -514,18 +533,18 @@ namespace NThreading { using TItem = TAutoPtr<T>; ~TAutoQueueBase() { - TAutoPtr<T> value; + TItem value; while (Dequeue(value)) { // do nothing } } - void Enqueue(TAutoPtr<T> value) { + void Enqueue(TItem value) { Impl.Enqueue(value.Get()); Y_UNUSED(value.Release()); } - bool Dequeue(TAutoPtr<T>& value) { + bool Dequeue(TItem& value) { T* ptr = nullptr; if (Impl.Dequeue(ptr)) { value.Reset(ptr); @@ -553,4 +572,4 @@ namespace NThreading { template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE> using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>; -} +} // namespace NThreading |