summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue
diff options
context:
space:
mode:
authorkulikov <[email protected]>2026-03-20 19:39:06 +0300
committerkulikov <[email protected]>2026-03-20 20:17:51 +0300
commit14bf6e9ab2e2a49e7b066904080cf9e121a348d9 (patch)
tree723aeeaacfc85442b14485526ccf342169640cf0 /library/cpp/threading/equeue
parent8c1168348ffdcc290ddee600735708101b5f708c (diff)
Switch to std atomics
commit_hash:5d980b19ed177f3a4ce03ba7c7d89ab9d711b8e8
Diffstat (limited to 'library/cpp/threading/equeue')
-rw-r--r--library/cpp/threading/equeue/equeue.cpp14
-rw-r--r--library/cpp/threading/equeue/equeue.h7
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp30
-rw-r--r--library/cpp/threading/equeue/ya.make4
4 files changed, 26 insertions, 29 deletions
diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp
index 54a848e912a..54b088635d5 100644
--- a/library/cpp/threading/equeue/equeue.cpp
+++ b/library/cpp/threading/equeue/equeue.cpp
@@ -6,12 +6,12 @@ TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
}
size_t TElasticQueue::ObjectCount() const {
- return (size_t)AtomicGet(ObjectCount_);
+ return ObjectCount_.load();
}
bool TElasticQueue::TryIncCounter() {
- if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
- AtomicDecrement(GuardCount_);
+ if (++GuardCount_ > MaxQueueSize_) {
+ --GuardCount_;
return false;
}
@@ -26,12 +26,12 @@ public:
: RealObject_(realObject)
, Queue_(queue)
{
- AtomicIncrement(Queue_->ObjectCount_);
+ ++Queue_->ObjectCount_;
}
~TDecrementingWrapper() override {
- AtomicDecrement(Queue_->ObjectCount_);
- AtomicDecrement(Queue_->GuardCount_);
+ --Queue_->ObjectCount_;
+ --Queue_->GuardCount_;
}
private:
void Process(void *tsr) override {
@@ -54,7 +54,7 @@ bool TElasticQueue::Add(IObjectInQueue* obj) {
try {
wrapper.Reset(new TDecrementingWrapper(obj, this));
} catch (...) {
- AtomicDecrement(GuardCount_);
+ --GuardCount_;
throw;
}
diff --git a/library/cpp/threading/equeue/equeue.h b/library/cpp/threading/equeue/equeue.h
index c61b9f7b857..4d03ad051db 100644
--- a/library/cpp/threading/equeue/equeue.h
+++ b/library/cpp/threading/equeue/equeue.h
@@ -1,9 +1,10 @@
#pragma once
#include <util/thread/pool.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/generic/ptr.h>
+#include <atomic>
+
//actual queue limit will be (maxQueueSize - numBusyThreads) or 0
class TElasticQueue: public IThreadPool {
public:
@@ -23,6 +24,6 @@ private:
private:
THolder<IThreadPool> SlaveQueue_;
size_t MaxQueueSize_ = 0;
- TAtomic ObjectCount_ = 0;
- TAtomic GuardCount_ = 0;
+ std::atomic<size_t> ObjectCount_ = 0;
+ std::atomic<size_t> GuardCount_ = 0;
};
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 47b1029a2f9..0b66128c73d 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -44,10 +44,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
Processed = Scheduled = Discarded = Total = 0;
}
- TAtomic Processed;
- TAtomic Scheduled;
- TAtomic Discarded;
- TAtomic Total;
+ std::atomic<size_t> Processed;
+ std::atomic<size_t> Scheduled;
+ std::atomic<size_t> Discarded;
+ std::atomic<size_t> Total;
};
static TCounters Counters;
@@ -61,7 +61,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
struct TWaitJob: public IObjectInQueue {
void Process(void*) override {
WaitEvent.Wait();
- AtomicIncrement(Counters.Processed);
+ ++Counters.Processed;
}
} job;
@@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Processed.load(), enqueued);
}
Y_UNIT_TEST(FillTest) {
@@ -101,25 +101,25 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
void Process(void*) override {
- AtomicIncrement(Counters.Processed);
+ ++Counters.Processed;
}
};
static TJob Job;
template <typename T>
static bool TryAdd() {
- AtomicIncrement(Counters.Total);
+ ++Counters.Total;
if (TEnv<T>::Queue->Add(&Job)) {
- AtomicIncrement(Counters.Scheduled);
+ ++Counters.Scheduled;
return true;
} else {
- AtomicIncrement(Counters.Discarded);
+ ++Counters.Discarded;
return false;
}
}
const size_t N = 100000;
- static size_t TryCounter;
+ static std::atomic<size_t> TryCounter;
template <typename T>
void ConcurrentTest() {
@@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
- while ((size_t)AtomicIncrement(TryCounter) <= N) {
+ while (++TryCounter <= N) {
if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
@@ -149,9 +149,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
- UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
- UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Total.load(), N);
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Processed.load(), Counters.Scheduled.load());
+ UNIT_ASSERT_VALUES_EQUAL(Counters.Total.load(), Counters.Scheduled.load() + Counters.Discarded.load());
}
Y_UNIT_TEST(ConcurrentTest) {
diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make
index 445797aa121..e51702f8225 100644
--- a/library/cpp/threading/equeue/ya.make
+++ b/library/cpp/threading/equeue/ya.make
@@ -5,10 +5,6 @@ SRCS(
equeue.cpp
)
-PEERDIR(
- library/cpp/deprecated/atomic
-)
-
END()
RECURSE_FOR_TESTS(