aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authornae202 <nae202@yandex-team.com>2025-06-04 00:46:32 +0300
committernae202 <nae202@yandex-team.com>2025-06-04 01:00:07 +0300
commit4e19d625f6ee3bfd94071094f06eefc0546a6d03 (patch)
treeda61a31765409ca7f5ebe432d0391d25405b1027 /library/cpp
parent5a0c4130701ca71b820a48ccb151b3bde804bda0 (diff)
downloadydb-4e19d625f6ee3bfd94071094f06eefc0546a6d03.tar.gz
TTryGuard for queue lock
commit_hash:027b7f086dcb0c574896ea5bd2c4958bf82914cf
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/threading/chunk_queue/queue.h97
1 files changed, 58 insertions, 39 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index 17ebef91a86..7cb4be4b4ef 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -1,11 +1,12 @@
#pragma once
+#include <library/cpp/deprecated/atomic/atomic.h> // AtomicGet
+
#include <util/datetime/base.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/generic/typetraits.h>
#include <util/generic/ylimits.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>
@@ -13,15 +14,15 @@
#include <atomic>
namespace NThreading {
-////////////////////////////////////////////////////////////////////////////////
-// Platform helpers
+ ////////////////////////////////////////////////////////////////////////////////
+ // Platform helpers
#if !defined(PLATFORM_CACHE_LINE)
-#define PLATFORM_CACHE_LINE 64
+ #define PLATFORM_CACHE_LINE 64
#endif
#if !defined(PLATFORM_PAGE_SIZE)
-#define PLATFORM_PAGE_SIZE 4 * 1024
+ #define PLATFORM_PAGE_SIZE 4 * 1024
#endif
template <typename T, size_t PadSize = PLATFORM_CACHE_LINE>
@@ -33,7 +34,7 @@ namespace NThreading {
Y_UNUSED(Pad);
}
- template<typename... Args>
+ template <typename... Args>
TPadded(Args&&... args)
: T(std::forward<Args>(args)...)
{
@@ -85,7 +86,7 @@ namespace NThreading {
TPodTypeHelper<T>,
TNonPodTypeHelper<T>>;
- }
+ } // namespace NImpl
////////////////////////////////////////////////////////////////////////////////
// One producer/one consumer chunked queue.
@@ -292,15 +293,19 @@ namespace NThreading {
bool TryEnqueue(TT&& value, ui64 tag) {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- TEntry* entry = queue.PrepareWrite();
- Y_ASSERT(entry);
- TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
- entry->Tag = tag;
- queue.CompleteWrite();
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
}
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
+ }
+ TEntry* entry = queue.PrepareWrite();
+ Y_ASSERT(entry);
+ TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
+ entry->Tag = tag;
+ queue.CompleteWrite();
+ return true;
}
return false;
}
@@ -423,11 +428,15 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- queue.Enqueue(std::forward<TT>(value));
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
}
+ queue.Enqueue(std::forward<TT>(value));
+ return true;
}
return false;
}
@@ -461,12 +470,15 @@ namespace NThreading {
size_t readPos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[readPos++ % Concurrency];
- if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
- bool dequeued = queue.Dequeue(value);
- queue.ReadLock.Release();
- if (dequeued) {
- return true;
- }
+ if (queue.ReadLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.ReadLock};
+ if (!guard) {
+ continue;
+ }
+ if (queue.Dequeue(value)) {
+ return true;
}
}
return false;
@@ -475,12 +487,15 @@ namespace NThreading {
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
- bool empty = queue.IsEmpty();
- queue.ReadLock.Release();
- if (!empty) {
- return false;
- }
+ if (queue.ReadLock.IsLocked()) {
+ continue;
+ }
+ TTryGuard guard{queue.ReadLock};
+ if (!guard) {
+ continue;
+ }
+ if (!queue.IsEmpty()) {
+ return false;
}
}
return true;
@@ -492,11 +507,15 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
- queue.Enqueue(std::forward<TT>(value));
- queue.WriteLock.Release();
- return true;
+ if (queue.WriteLock.IsLocked()) {
+ continue;
}
+ TTryGuard guard{queue.WriteLock};
+ if (!guard) {
+ continue;
+ }
+ queue.Enqueue(std::forward<TT>(value));
+ return true;
}
return false;
}
@@ -514,18 +533,18 @@ namespace NThreading {
using TItem = TAutoPtr<T>;
~TAutoQueueBase() {
- TAutoPtr<T> value;
+ TItem value;
while (Dequeue(value)) {
// do nothing
}
}
- void Enqueue(TAutoPtr<T> value) {
+ void Enqueue(TItem value) {
Impl.Enqueue(value.Get());
Y_UNUSED(value.Release());
}
- bool Dequeue(TAutoPtr<T>& value) {
+ bool Dequeue(TItem& value) {
T* ptr = nullptr;
if (Impl.Dequeue(ptr)) {
value.Reset(ptr);
@@ -553,4 +572,4 @@ namespace NThreading {
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
using TAutoRelaxedManyManyQueue = TAutoQueueBase<T, TRelaxedManyManyQueue<T*, Concurrency, ChunkSize>>;
-}
+} // namespace NThreading