aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authortobo <tobo@yandex-team.com>2025-05-30 15:22:41 +0300
committertobo <tobo@yandex-team.com>2025-05-30 16:06:29 +0300
commit214c429243173c62ed6b95d778e9af2448cd2007 (patch)
treec3f48ec4f9180bc0c3ba47e429f03cea2d9a7fe4 /library/cpp
parent3c5cbc7d5e3754691f6f18de2f68ff3637ff71f7 (diff)
downloadydb-214c429243173c62ed6b95d778e9af2448cd2007.tar.gz
AtomicTryLock() / AtomicUnlock() => TSpinLock
commit_hash:d6f16e427045049e4e5815d09a949cc721a20c79
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/threading/chunk_queue/queue.h44
1 files changed, 17 insertions, 27 deletions
diff --git a/library/cpp/threading/chunk_queue/queue.h b/library/cpp/threading/chunk_queue/queue.h
index feee776b872..fdde719e503 100644
--- a/library/cpp/threading/chunk_queue/queue.h
+++ b/library/cpp/threading/chunk_queue/queue.h
@@ -4,13 +4,13 @@
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/generic/typetraits.h>
-#include <util/generic/vector.h>
#include <util/generic/ylimits.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>
+#include <atomic>
#include <type_traits>
#include <utility>
@@ -240,7 +240,7 @@ namespace NThreading {
};
struct TQueueType: public TOneOneQueue<TEntry, ChunkSize> {
- TAtomic WriteLock = 0;
+ TSpinLock WriteLock;
using TOneOneQueue<TEntry, ChunkSize>::PrepareWrite;
using TOneOneQueue<TEntry, ChunkSize>::CompleteWrite;
@@ -250,11 +250,7 @@ namespace NThreading {
};
private:
- union {
- TAtomic WriteTag = 0;
- char Pad[PLATFORM_CACHE_LINE];
- };
-
+ TPadded<std::atomic<ui64>> WriteTag{1};
TQueueType Queues[Concurrency];
public:
@@ -291,20 +287,20 @@ namespace NThreading {
ui64 NextTag() {
// TODO: can we avoid synchronization here? it costs 1.5x performance penalty
// return GetCycleCount();
- return AtomicIncrement(WriteTag);
+ return WriteTag.fetch_add(1);
}
template <typename TT>
bool TryEnqueue(TT&& value, ui64 tag) {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
TEntry* entry = queue.PrepareWrite();
Y_ASSERT(entry);
TTypeHelper::Write(&entry->Value, std::forward<TT>(value));
entry->Tag = tag;
queue.CompleteWrite();
- AtomicUnlock(&queue.WriteLock);
+ queue.WriteLock.Release();
return true;
}
}
@@ -383,7 +379,7 @@ namespace NThreading {
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TRelaxedManyOneQueue: private TNonCopyable {
struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- TAtomic WriteLock = 0;
+ TSpinLock WriteLock;
};
private:
@@ -429,9 +425,9 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
+ queue.WriteLock.Release();
return true;
}
}
@@ -446,14 +442,8 @@ namespace NThreading {
template <typename T, size_t Concurrency = 4, size_t ChunkSize = PLATFORM_PAGE_SIZE>
class TRelaxedManyManyQueue: private TNonCopyable {
struct TQueueType: public TOneOneQueue<T, ChunkSize> {
- union {
- TAtomic WriteLock = 0;
- char Pad1[PLATFORM_CACHE_LINE];
- };
- union {
- TAtomic ReadLock = 0;
- char Pad2[PLATFORM_CACHE_LINE];
- };
+ TPadded<TSpinLock> WriteLock;
+ TPadded<TSpinLock> ReadLock;
};
private:
@@ -473,9 +463,9 @@ namespace NThreading {
size_t readPos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[readPos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
bool dequeued = queue.Dequeue(value);
- AtomicUnlock(&queue.ReadLock);
+ queue.ReadLock.Release();
if (dequeued) {
return true;
}
@@ -487,9 +477,9 @@ namespace NThreading {
bool IsEmpty() {
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[i];
- if (AtomicTryAndTryLock(&queue.ReadLock)) {
+ if (!queue.ReadLock.IsLocked() && queue.ReadLock.TryAcquire()) {
bool empty = queue.IsEmpty();
- AtomicUnlock(&queue.ReadLock);
+ queue.ReadLock.Release();
if (!empty) {
return false;
}
@@ -504,9 +494,9 @@ namespace NThreading {
size_t writePos = GetCycleCount();
for (size_t i = 0; i < Concurrency; ++i) {
TQueueType& queue = Queues[writePos++ % Concurrency];
- if (AtomicTryAndTryLock(&queue.WriteLock)) {
+ if (!queue.WriteLock.IsLocked() && queue.WriteLock.TryAcquire()) {
queue.Enqueue(std::forward<TT>(value));
- AtomicUnlock(&queue.WriteLock);
+ queue.WriteLock.Release();
return true;
}
}