diff options
| author | kulikov <[email protected]> | 2026-03-20 19:39:06 +0300 |
|---|---|---|
| committer | kulikov <[email protected]> | 2026-03-20 20:17:51 +0300 |
| commit | 14bf6e9ab2e2a49e7b066904080cf9e121a348d9 (patch) | |
| tree | 723aeeaacfc85442b14485526ccf342169640cf0 /library/cpp/threading/equeue | |
| parent | 8c1168348ffdcc290ddee600735708101b5f708c (diff) | |
Switch to std atomics
commit_hash:5d980b19ed177f3a4ce03ba7c7d89ab9d711b8e8
Diffstat (limited to 'library/cpp/threading/equeue')
| -rw-r--r-- | library/cpp/threading/equeue/equeue.cpp | 14 | ||||
| -rw-r--r-- | library/cpp/threading/equeue/equeue.h | 7 | ||||
| -rw-r--r-- | library/cpp/threading/equeue/equeue_ut.cpp | 30 | ||||
| -rw-r--r-- | library/cpp/threading/equeue/ya.make | 4 |
4 files changed, 26 insertions, 29 deletions
diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp index 54a848e912a..54b088635d5 100644 --- a/library/cpp/threading/equeue/equeue.cpp +++ b/library/cpp/threading/equeue/equeue.cpp @@ -6,12 +6,12 @@ TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue) } size_t TElasticQueue::ObjectCount() const { - return (size_t)AtomicGet(ObjectCount_); + return ObjectCount_.load(); } bool TElasticQueue::TryIncCounter() { - if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) { - AtomicDecrement(GuardCount_); + if (++GuardCount_ > MaxQueueSize_) { + --GuardCount_; return false; } @@ -26,12 +26,12 @@ public: : RealObject_(realObject) , Queue_(queue) { - AtomicIncrement(Queue_->ObjectCount_); + ++Queue_->ObjectCount_; } ~TDecrementingWrapper() override { - AtomicDecrement(Queue_->ObjectCount_); - AtomicDecrement(Queue_->GuardCount_); + --Queue_->ObjectCount_; + --Queue_->GuardCount_; } private: void Process(void *tsr) override { @@ -54,7 +54,7 @@ bool TElasticQueue::Add(IObjectInQueue* obj) { try { wrapper.Reset(new TDecrementingWrapper(obj, this)); } catch (...) { - AtomicDecrement(GuardCount_); + --GuardCount_; throw; } diff --git a/library/cpp/threading/equeue/equeue.h b/library/cpp/threading/equeue/equeue.h index c61b9f7b857..4d03ad051db 100644 --- a/library/cpp/threading/equeue/equeue.h +++ b/library/cpp/threading/equeue/equeue.h @@ -1,9 +1,10 @@ #pragma once #include <util/thread/pool.h> -#include <library/cpp/deprecated/atomic/atomic.h> #include <util/generic/ptr.h> +#include <atomic> + //actual queue limit will be (maxQueueSize - numBusyThreads) or 0 class TElasticQueue: public IThreadPool { public: @@ -23,6 +24,6 @@ private: private: THolder<IThreadPool> SlaveQueue_; size_t MaxQueueSize_ = 0; - TAtomic ObjectCount_ = 0; - TAtomic GuardCount_ = 0; + std::atomic<size_t> ObjectCount_ = 0; + std::atomic<size_t> GuardCount_ = 0; }; diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 47b1029a2f9..0b66128c73d 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -44,10 +44,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { Processed = Scheduled = Discarded = Total = 0; } - TAtomic Processed; - TAtomic Scheduled; - TAtomic Discarded; - TAtomic Total; + std::atomic<size_t> Processed; + std::atomic<size_t> Scheduled; + std::atomic<size_t> Discarded; + std::atomic<size_t> Total; }; static TCounters Counters; @@ -61,7 +61,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { struct TWaitJob: public IObjectInQueue { void Process(void*) override { WaitEvent.Wait(); - AtomicIncrement(Counters.Processed); + ++Counters.Processed; } } job; @@ -87,7 +87,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); + UNIT_ASSERT_VALUES_EQUAL(Counters.Processed.load(), enqueued); } Y_UNIT_TEST(FillTest) { @@ -101,25 +101,25 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { void Process(void*) override { - AtomicIncrement(Counters.Processed); + ++Counters.Processed; } }; static TJob Job; template <typename T> static bool TryAdd() { - AtomicIncrement(Counters.Total); + ++Counters.Total; if (TEnv<T>::Queue->Add(&Job)) { - AtomicIncrement(Counters.Scheduled); + ++Counters.Scheduled; return true; } else { - AtomicIncrement(Counters.Discarded); + ++Counters.Discarded; return false; } } const size_t N = 100000; - static size_t TryCounter; + static std::atomic<size_t> TryCounter; template <typename T> void ConcurrentTest() { @@ -128,7 +128,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { - while ((size_t)AtomicIncrement(TryCounter) <= N) { + while (++TryCounter <= N) { if (!TryAdd<T>()) { Sleep(TDuration::MicroSeconds(50)); } @@ -149,9 +149,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } - UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); - UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); - UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); + UNIT_ASSERT_VALUES_EQUAL(Counters.Total.load(), N); + UNIT_ASSERT_VALUES_EQUAL(Counters.Processed.load(), Counters.Scheduled.load()); + UNIT_ASSERT_VALUES_EQUAL(Counters.Total.load(), Counters.Scheduled.load() + Counters.Discarded.load()); } Y_UNIT_TEST(ConcurrentTest) { diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make index 445797aa121..e51702f8225 100644 --- a/library/cpp/threading/equeue/ya.make +++ b/library/cpp/threading/equeue/ya.make @@ -5,10 +5,6 @@ SRCS( equeue.cpp ) -PEERDIR( - library/cpp/deprecated/atomic -) - END() RECURSE_FOR_TESTS( |
