summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <[email protected]>2024-04-10 16:58:00 +0300
committerGitHub <[email protected]>2024-04-10 16:58:00 +0300
commitb0ef85ebd550677bc732c050f0aeba245cfda17e (patch)
treec536164f5fc71cb17f0a46420d196a4538c246c5
parenta19b5f1a1be3a3102498effdee8f0e3e2f9c81d5 (diff)
Add basic multithread tests fot MPMCRingQueue (#3631)
-rw-r--r--ydb/library/actors/queues/mpmc_ring_queue.cpp3
-rw-r--r--ydb/library/actors/queues/mpmc_ring_queue.h97
-rw-r--r--ydb/library/actors/queues/mpmc_ring_queue_ut_base.h50
-rw-r--r--ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp317
-rw-r--r--ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp (renamed from ydb/library/actors/queues/mpmc_ring_queue_ut.cpp)87
-rw-r--r--ydb/library/actors/queues/ut/ya.make3
-rw-r--r--ydb/library/actors/queues/ya.make2
7 files changed, 470 insertions, 89 deletions
diff --git a/ydb/library/actors/queues/mpmc_ring_queue.cpp b/ydb/library/actors/queues/mpmc_ring_queue.cpp
new file mode 100644
index 00000000000..df186ee66e7
--- /dev/null
+++ b/ydb/library/actors/queues/mpmc_ring_queue.cpp
@@ -0,0 +1,3 @@
+#include "mpmc_ring_queue.h"
+
+thread_local NActors::TMPMCRingQueueStats::TStats NActors::TMPMCRingQueueStats::Stats;
diff --git a/ydb/library/actors/queues/mpmc_ring_queue.h b/ydb/library/actors/queues/mpmc_ring_queue.h
index 49466478698..45bc4950fd8 100644
--- a/ydb/library/actors/queues/mpmc_ring_queue.h
+++ b/ydb/library/actors/queues/mpmc_ring_queue.h
@@ -2,12 +2,79 @@
#include "defs.h"
#include <library/cpp/threading/chunk_queue/queue.h>
+#include <util/system/mutex.h>
#include <atomic>
#include <optional>
namespace NActors {
+struct TMPMCRingQueueStats {
+
+ struct TStats {
+ ui64 SuccessPushes = 0;
+ ui64 SuccessSlowPushes = 0;
+ ui64 SuccessFastPushes = 0;
+ ui64 FailedPushes = 0;
+ ui64 FailedSlowPushAttempts = 0;
+
+ ui64 ChangesFastPushToSlowPush = 0;
+
+ TStats& operator += (const TStats &other) {
+ SuccessPushes += other.SuccessPushes;
+ SuccessSlowPushes += other.SuccessSlowPushes;
+ SuccessFastPushes += other.SuccessFastPushes;
+ ChangesFastPushToSlowPush += other.ChangesFastPushToSlowPush;
+ FailedPushes += other.FailedPushes;
+ FailedSlowPushAttempts += other.FailedSlowPushAttempts;
+ return *this;
+ }
+ };
+ static thread_local TStats Stats;
+
+#ifdef MPMC_RING_QUEUE_COLLECT_STATISTICS
+ static constexpr bool CollectStatistics = true;
+#else
+ static constexpr bool CollectStatistics = false;
+#endif
+
+ static void IncrementSuccessSlowPushes() {
+ if constexpr (CollectStatistics) {
+ Stats.SuccessSlowPushes++;
+ Stats.SuccessPushes++;
+ }
+ }
+
+ static void IncrementSuccessFastPushes() {
+ if constexpr (CollectStatistics) {
+ Stats.SuccessFastPushes++;
+ Stats.SuccessPushes++;
+ }
+ }
+
+ static void IncrementChangesFastPushToSlowPush() {
+ if constexpr (CollectStatistics) {
+ Stats.ChangesFastPushToSlowPush++;
+ }
+ }
+
+ static void IncrementFailedPushes() {
+ if constexpr (CollectStatistics) {
+ Stats.FailedPushes++;
+ }
+ }
+
+ static void IncrementFailedSlowPushAttempts() {
+ if constexpr (CollectStatistics) {
+ Stats.FailedSlowPushAttempts++;
+ }
+ }
+
+ static TStats GetLocalStats() {
+ return Stats;
+ }
+};
+
template <ui32 MaxSizeBits>
struct TMPMCRingQueue {
static constexpr ui32 MaxSize = 1 << MaxSizeBits;
@@ -64,8 +131,8 @@ struct TMPMCRingQueue {
}
bool TryPushSlow(ui32 val) {
+ ui64 currentTail = Tail.load(std::memory_order_relaxed);
for (;;) {
- ui64 currentTail = Tail.load(std::memory_order_acquire);
ui32 generation = currentTail / MaxSize;
std::atomic<ui64> &currentSlot = Buffer[ConvertIdx(currentTail)];
@@ -74,6 +141,7 @@ struct TMPMCRingQueue {
do {
if (currentSlot.compare_exchange_weak(expected, val)) {
Tail.compare_exchange_strong(currentTail, currentTail + 1);
+ TMPMCRingQueueStats::IncrementSuccessSlowPushes();
return true;
}
slot = TSlot::Recognise(expected);
@@ -81,13 +149,14 @@ struct TMPMCRingQueue {
if (!slot.IsEmpty) {
ui64 currentHead = Head.load(std::memory_order_acquire);
- if (currentHead + MaxSize <= currentTail + std::min<ui64>(1024, MaxSize - 1)) {
+ if (currentHead + MaxSize <= currentTail + std::min<ui64>(64, MaxSize - 1)) {
return false;
}
}
- Tail.compare_exchange_strong(currentTail, currentTail + 1);
SpinLockPause();
+ TMPMCRingQueueStats::IncrementFailedSlowPushAttempts();
+ Tail.compare_exchange_strong(currentTail, currentTail + 1, std::memory_order_acq_rel);
}
}
@@ -100,26 +169,16 @@ struct TMPMCRingQueue {
ui64 expected = TSlot::MakeEmpty(generation);
do {
if (currentSlot.compare_exchange_weak(expected, val)) {
+ TMPMCRingQueueStats::IncrementSuccessFastPushes();
return true;
}
slot = TSlot::Recognise(expected);
} while (slot.Generation <= generation && slot.IsEmpty);
- if (!slot.IsEmpty) {
- ui64 nextTail = currentTail + 1;
- for (;;) {
- ui64 currentHead = Head.load(std::memory_order_acquire);
- if (currentHead + MaxSize <= currentTail + std::min<ui64>(1024, MaxSize - 1)) {
- if (Tail.compare_exchange_weak(nextTail, currentTail, std::memory_order_acq_rel)) {
- return false;
- }
- if (nextTail > currentTail && nextTail - currentTail < 16) {
- continue;
- }
- }
- break;
- }
- }
+ // TODO(kruall): mesure it's impact in bechmark
+ TMPMCRingQueueStats::IncrementChangesFastPushToSlowPush();
+ currentTail++;
+ Tail.compare_exchange_weak(currentTail, currentTail - 1, std::memory_order_relaxed);
return TryPushSlow(val);
}
@@ -217,7 +276,7 @@ struct TMPMCRingQueue {
if (!currentHead) {
currentHead = Head.load(std::memory_order_acquire);
}
- for (ui32 it = 0; it < 3; ++it) {
+ for (;;) {
ui32 generation = currentHead / MaxSize;
std::atomic<ui64> &currentSlot = Buffer[ConvertIdx(currentHead)];
diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h b/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h
new file mode 100644
index 00000000000..f383285372a
--- /dev/null
+++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include "mpmc_ring_queue.h"
+
+
+namespace NActors::NTests {
+
+ struct IQueue {
+ virtual ~IQueue() = default;
+ virtual bool TryPush(ui32 value) = 0;
+ virtual std::optional<ui32> TryPop() = 0;
+ };
+
+ template <ui32 SizeBits>
+ using TTryPush = bool (TMPMCRingQueue<SizeBits>::*)(ui32 value);
+ template <ui32 SizeBits>
+ using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits>::*)();
+
+ template <ui32 SizeBits, TTryPush<SizeBits> TryPushMethod, TTryPop<SizeBits> TryPopMethod>
+ struct TMPMCQueueBase : IQueue {
+ TMPMCRingQueue<SizeBits> *Queue;
+
+ TMPMCQueueBase(TMPMCRingQueue<SizeBits> *queue)
+ : Queue(queue)
+ {}
+
+ bool TryPush(ui32 value) final {
+ return (Queue->*TryPushMethod)(value);
+ }
+ std::optional<ui32> TryPop() final {
+ return (Queue->*TryPopMethod)();
+ }
+ };
+
+ template <ui32 SizeBits>
+ using TVerySlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopReallySlow>;
+
+ template <ui32 SizeBits>
+ using TSlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopSlow>;
+
+ template <ui32 SizeBits>
+ using TFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopFast>;
+
+ template <ui32 SizeBits>
+ using TVeryFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopReallyFast>;
+
+ template <ui32 SizeBits>
+ using TSingleQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopSingleConsumer>;
+
+}
diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp b/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp
new file mode 100644
index 00000000000..9b49fd5a769
--- /dev/null
+++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp
@@ -0,0 +1,317 @@
+#define MPMC_RING_QUEUE_COLLECT_STATISTICS
+
+#include "mpmc_ring_queue.h"
+#include "mpmc_ring_queue_ut_base.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <memory>
+#include <util/random/random.h>
+#include <util/system/thread.h>
+
+#include <queue>
+
+
+using namespace NActors;
+using namespace NActors::NTests;
+
+namespace { // Tests
+
+ enum class EThreadAction {
+ Continue,
+ Sleep,
+ Kill,
+ };
+
+ struct TThreadAction {
+ EThreadAction Action;
+ ui64 SleepNs = 0;
+ };
+
+ enum class EWorkerAction {
+ Push,
+ Pop,
+ Sleep,
+ Kill,
+ };
+
+ enum class EExpectedStatus {
+ Nothing,
+ Success,
+ Failure,
+ RepeatUntilSuccess,
+ };
+
+ struct TWorkerAction {
+ EWorkerAction Action;
+ EExpectedStatus Expected = EExpectedStatus::Nothing;
+ std::optional<ui64> Value;
+
+ explicit operator TThreadAction() const {
+ switch (Action) {
+ case EWorkerAction::Push:
+ case EWorkerAction::Pop:
+ return {.Action=EThreadAction::Continue};
+ case EWorkerAction::Sleep:
+ UNIT_ASSERT(Value);
+ return {.Action=EThreadAction::Sleep, .SleepNs=*Value};
+ case EWorkerAction::Kill:
+ return {.Action=EThreadAction::Kill};
+ }
+ }
+ };
+
+ template <typename TQueue>
+ class TSimpleWorker {
+ public:
+ TSimpleWorker(TQueue *queue, const std::vector<TWorkerAction> actions, ui32 repeatCount = 1)
+ : Queue(queue)
+ , Actions(actions)
+ , RepeatCount(repeatCount)
+ {
+ UNIT_ASSERT(actions.size());
+ }
+
+ TThreadAction Do() {
+ if (Idx == Actions.size()) {
+ Idx = 0;
+ if (--RepeatCount == 0) {
+ return {.Action=EThreadAction::Kill};
+ }
+ }
+ TWorkerAction &action = Actions[Idx++];
+ switch (action.Action) {
+ case EWorkerAction::Push: {
+ UNIT_ASSERT(action.Value);
+ bool success = Queue->TryPush(*action.Value);
+ if (action.Expected == EExpectedStatus::RepeatUntilSuccess) {
+ while (!success) {
+ success = Queue->TryPush(*action.Value);
+ }
+ } else if (action.Expected != EExpectedStatus::Nothing) {
+ UNIT_ASSERT_VALUES_EQUAL(success, (action.Expected == EExpectedStatus::Success));
+ }
+ }
+ break;
+ case EWorkerAction::Pop: {
+ auto value = Queue->TryPop();
+ if (action.Expected == EExpectedStatus::RepeatUntilSuccess) {
+ while (!value) {
+ value = Queue->TryPop();
+ }
+ } else if (action.Expected != EExpectedStatus::Nothing) {
+ UNIT_ASSERT_VALUES_EQUAL(bool(value), (action.Expected == EExpectedStatus::Success));
+ if (value && action.Value) {
+ UNIT_ASSERT_VALUES_EQUAL(value, action.Value);
+ }
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ return static_cast<TThreadAction>(action);
+ }
+
+ private:
+ TQueue *Queue;
+ std::vector<TWorkerAction> Actions;
+ ui32 Idx = 0;
+ ui32 RepeatCount = 0;
+ };
+
+ struct TStatsCollector {
+ TMutex Mutex;
+ NActors::TMPMCRingQueueStats::TStats Stats;
+
+ void AddStats(const NActors::TMPMCRingQueueStats::TStats &stats) {
+ TGuard<TMutex> guard(Mutex);
+ Stats += stats;
+ }
+ };
+
+ template <typename TWorker>
+ class TTestThread : public ISimpleThread {
+ public:
+ TTestThread(TWorker worker, TStatsCollector *statsCollector = nullptr)
+ : Worker(worker)
+ , StatsCollector(statsCollector)
+ {}
+
+ ~TTestThread() = default;
+
+ private:
+ bool Process(const TThreadAction &action) {
+ switch (action.Action) {
+ case EThreadAction::Continue:
+ break;
+ case EThreadAction::Sleep:
+ NanoSleep(action.SleepNs);
+ break;
+ case EThreadAction::Kill:
+ if (StatsCollector) {
+ auto stats = NActors::TMPMCRingQueueStats::GetLocalStats();
+ // Cerr << (TStringBuilder() << "thread: " << (ui64)this << " pushes: " << stats.SuccessPushes.load() << Endl);
+ StatsCollector->AddStats(stats);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void* ThreadProc() final {
+ for (;;) {
+ TThreadAction action = Worker.Do();
+ if (Process(action)) {
+ break;
+ }
+ }
+ return nullptr;
+ }
+
+ private:
+ TWorker Worker;
+ TStatsCollector *StatsCollector;
+ };
+
+ void RunThreads(const std::vector<ISimpleThread*> &threads) {
+ for (auto &thread : threads) {
+ thread->Start();
+ }
+ for (auto &thread : threads) {
+ thread->Join();
+ }
+ }
+
+ void RunThreads(const std::vector<std::unique_ptr<ISimpleThread>> &threads) {
+ for (auto &thread : threads) {
+ thread->Start();
+ }
+ for (auto &thread : threads) {
+ thread->Join();
+ }
+ }
+
+ template <typename ...TThreads>
+ void RunThreads(std::unique_ptr<TThreads>&& ...threads) {
+ RunThreads(std::vector<ISimpleThread*>{threads.release()...});
+ }
+
+
+ template <ui32 SizeBits>
+ struct TTestCases {
+
+ template <template <ui32> typename TQueueAdaptor>
+ static TStatsCollector BasicPushPopSingleThread() {
+ TMPMCRingQueue<SizeBits> realQueue;
+ TQueueAdaptor<SizeBits> adapter(&realQueue);
+ TStatsCollector collector;
+ TSimpleWorker<decltype(adapter)> worker(
+ &adapter,
+ {
+ TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1},
+ TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=2},
+ TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1},
+ TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=2},
+ }
+ );
+ RunThreads(std::make_unique<TTestThread<decltype(worker)>>(worker, &collector));
+ return std::move(collector);
+ }
+
+ template <template <ui32> typename TQueueAdaptor, ui32 ThreadCount, ui32 RepeatCount>
+ static TStatsCollector BasicPushPopMultiThreads() {
+ TMPMCRingQueue<SizeBits> realQueue;
+ TVector<std::unique_ptr<IQueue>> adapters;
+ TVector<std::unique_ptr<ISimpleThread>> threads;
+ TStatsCollector collector;
+ for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) {
+ TQueueAdaptor<SizeBits> *adapter = new TQueueAdaptor<SizeBits>(&realQueue);
+ adapters.emplace_back(adapter);
+ TSimpleWorker<std::decay_t<decltype(*adapter)>> worker(
+ adapter,
+ {
+ TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1},
+ TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1},
+ },
+ RepeatCount
+ );
+ threads.emplace_back(new TTestThread<decltype(worker)>(worker, &collector));
+ }
+ RunThreads(threads);
+ return std::move(collector);
+ }
+ };
+
+}
+
+
+#define BASIC_PUSH_POP_SINGLE_THREAD_FAST(QUEUE) \
+ Y_UNIT_TEST(BasicPushPopSingleThread_ ## QUEUE) { \
+ TStatsCollector collector = TTestCases<10>::BasicPushPopSingleThread<QUEUE>(); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, 2); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 2); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedSlowPushAttempts, 0); \
+ } \
+// end BASIC_PUSH_POP_SINGLE_THREAD
+
+
+#define BASIC_PUSH_POP_SINGLE_THREAD_SLOW(QUEUE) \
+ Y_UNIT_TEST(BasicPushPopSingleThread_ ## QUEUE) { \
+ TStatsCollector collector = TTestCases<10>::BasicPushPopSingleThread<QUEUE>(); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, 2); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, 2); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedSlowPushAttempts, 0); \
+ } \
+// end BASIC_PUSH_POP_SINGLE_THREAD
+
+
+#define BASIC_PUSH_POP_MUTLI_THREADS_FAST(QUEUE) \
+ Y_UNIT_TEST(BasicPushPopMultiThreads_ ## QUEUE) { \
+ constexpr ui32 ThreadCount = 10; \
+ constexpr ui32 RepeatCount = 1000; \
+ TStatsCollector collector = TTestCases<10>::BasicPushPopMultiThreads<QUEUE, ThreadCount, RepeatCount>();\
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, RepeatCount * ThreadCount); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, collector.Stats.SuccessSlowPushes); \
+ UNIT_ASSERT_VALUES_EQUAL( \
+ collector.Stats.SuccessFastPushes + collector.Stats.SuccessSlowPushes, \
+ collector.Stats.SuccessPushes \
+ ); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \
+ } \
+// end BASIC_PUSH_POP_MUTLI_THREADS_FAST
+
+
+#define BASIC_PUSH_POP_MUTLI_THREADS_SLOW(QUEUE) \
+ Y_UNIT_TEST(BasicPushPopMultiThreads_ ## QUEUE) { \
+ constexpr ui32 ThreadCount = 10; \
+ constexpr ui32 RepeatCount = 1000; \
+ TStatsCollector collector = TTestCases<10>::BasicPushPopMultiThreads<QUEUE, ThreadCount, RepeatCount>();\
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, RepeatCount * ThreadCount); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 0); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, collector.Stats.SuccessPushes); \
+ UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \
+ } \
+// end BASIC_PUSH_POP_MUTLI_THREADS_SLOW
+
+
+Y_UNIT_TEST_SUITE(MPMCRingQueueMultiThreadsTests) {
+
+ BASIC_PUSH_POP_SINGLE_THREAD_FAST(TVeryFastQueue)
+ BASIC_PUSH_POP_SINGLE_THREAD_FAST(TFastQueue)
+ BASIC_PUSH_POP_SINGLE_THREAD_SLOW(TSlowQueue)
+ BASIC_PUSH_POP_SINGLE_THREAD_SLOW(TVerySlowQueue)
+
+ BASIC_PUSH_POP_MUTLI_THREADS_FAST(TVeryFastQueue)
+ BASIC_PUSH_POP_MUTLI_THREADS_FAST(TFastQueue)
+ BASIC_PUSH_POP_MUTLI_THREADS_SLOW(TSlowQueue)
+ BASIC_PUSH_POP_MUTLI_THREADS_SLOW(TVerySlowQueue)
+
+}
diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut.cpp b/ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp
index 14807f96641..d7f20755a7b 100644
--- a/ydb/library/actors/queues/mpmc_ring_queue_ut.cpp
+++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp
@@ -1,66 +1,17 @@
+#define MPMC_RING_QUEUE_COLLECT_STATISTICS
+
#include "mpmc_ring_queue.h"
+#include "mpmc_ring_queue_ut_base.h"
#include <library/cpp/testing/unittest/registar.h>
#include <util/random/random.h>
-#include <util/system/thread.h>
#include <queue>
-namespace {
- using namespace NActors;
-
- class TThread : public ISimpleThread {
-
- TThread() = default;
-
- private:
- void* ThreadProc() override;
- };
-
- struct IQueue {
- virtual bool TryPush(ui32 value) = 0;
- virtual std::optional<ui32> TryPop() = 0;
- };
-
- template <ui32 SizeBits>
- using TTryPush = bool (TMPMCRingQueue<SizeBits>::*)(ui32 value);
- template <ui32 SizeBits>
- using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits>::*)();
-
- template <ui32 SizeBits, TTryPush<SizeBits> TryPushMethod, TTryPop<SizeBits> TryPopMethod>
- struct TMPMCQueueBase : IQueue {
- TMPMCRingQueue<SizeBits> &Queue;
-
- TMPMCQueueBase(TMPMCRingQueue<SizeBits> &queue)
- : Queue(queue)
- {}
-
- bool TryPush(ui32 value) override {
- return (Queue.*TryPushMethod)(value);
- }
- std::optional<ui32> TryPop() override {
- return (Queue.*TryPopMethod)();
- }
- };
-
- template <ui32 SizeBits>
- using TVerySlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopReallySlow>;
-
- template <ui32 SizeBits>
- using TSlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopSlow>;
-
- template <ui32 SizeBits>
- using TFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopFast>;
-
- template <ui32 SizeBits>
- using TVeryFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopReallyFast>;
-
- template <ui32 SizeBits>
- using TSingleQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopSingleConsumer>;
-
-}
+using namespace NActors;
+using namespace NActors::NTests;
namespace { // Tests
@@ -94,7 +45,7 @@ namespace { // Tests
static void PushesPopsWithShift() {
TMPMCRingQueue<SizeBits> realQueue;
- TQueueAdaptor<SizeBits> adaptor(realQueue);
+ TQueueAdaptor<SizeBits> adaptor(&realQueue);
for (ui32 it = 0; it < MaxSize; ++it) {
for (ui32 idx = 0; idx < MaxSize - 1; ++idx) {
@@ -133,7 +84,7 @@ namespace { // Tests
static void PushesOverloadPops() {
TMPMCRingQueue<SizeBits> realQueue;
- TQueueAdaptor<SizeBits> adaptor(realQueue);
+ TQueueAdaptor<SizeBits> adaptor(&realQueue);
for (ui32 it = 0; it < MaxSize; ++it) {
for (ui32 idx = 0; idx < MaxSize; ++idx) {
@@ -172,7 +123,7 @@ namespace { // Tests
return;
}
TMPMCRingQueue<SizeBits> realQueue;
- TQueueAdaptor<SizeBits> adaptor(realQueue);
+ TQueueAdaptor<SizeBits> adaptor(&realQueue);
ui64 emptyZeroGeneration = (ui64(1) << 63);
@@ -206,7 +157,7 @@ namespace { // Tests
static void CheckSlowPops() {
TMPMCRingQueue<SizeBits> realQueue;
- TQueueAdaptor<SizeBits> adaptor(realQueue);
+ TQueueAdaptor<SizeBits> adaptor(&realQueue);
ui64 emptyZeroGeneration = (ui64(1) << 63);
for (ui32 it = 0; it < MaxSize; ++it) {
for (ui32 idx = 0; idx < MaxSize; ++idx) {
@@ -224,7 +175,7 @@ namespace { // Tests
static void CheckFastPops() {
TMPMCRingQueue<SizeBits> realQueue;
- TQueueAdaptor<SizeBits> adaptor(realQueue);
+ TQueueAdaptor<SizeBits> adaptor(&realQueue);
for (ui32 it = 0; it < MaxSize; ++it) {
for (ui32 idx = 0; idx < MaxSize; ++idx) {
ui64 emptyCurrentGeneration = (ui64(1) << 63) + (ui64)it;
@@ -304,7 +255,7 @@ namespace { // Tests
constexpr ui32 SizeBits = 3;
-Y_UNIT_TEST_SUITE(MPMCRingQueueTests) {
+Y_UNIT_TEST_SUITE(MPMCRingQueueSingleThreadTests) {
BASIC_QUEUE_TEST_CASES(SizeBits, TSingleQueue);
BASIC_QUEUE_TEST_CASES(SizeBits, TVerySlowQueue);
BASIC_QUEUE_TEST_CASES(SizeBits, TSlowQueue);
@@ -322,8 +273,8 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) {
TestRandomUsage(
10'000,
(ui64(1) << SizeBits),
- TVeryFastQueue<SizeBits>(realQueue),
- TFastQueue<SizeBits>(realQueue)
+ TVeryFastQueue<SizeBits>(&realQueue),
+ TFastQueue<SizeBits>(&realQueue)
);
}
@@ -332,8 +283,8 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) {
TestRandomUsage(
10'000,
(ui64(1) << SizeBits),
- TVerySlowQueue<SizeBits>(realQueue),
- TSlowQueue<SizeBits>(realQueue)
+ TVerySlowQueue<SizeBits>(&realQueue),
+ TSlowQueue<SizeBits>(&realQueue)
);
}
@@ -342,10 +293,10 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) {
TestRandomUsage(
100'000,
(ui64(1) << SizeBits),
- TVerySlowQueue<SizeBits>(realQueue),
- TSlowQueue<SizeBits>(realQueue),
- TFastQueue<SizeBits>(realQueue),
- TVeryFastQueue<SizeBits>(realQueue)
+ TVerySlowQueue<SizeBits>(&realQueue),
+ TSlowQueue<SizeBits>(&realQueue),
+ TFastQueue<SizeBits>(&realQueue),
+ TVeryFastQueue<SizeBits>(&realQueue)
);
}
}
diff --git a/ydb/library/actors/queues/ut/ya.make b/ydb/library/actors/queues/ut/ya.make
index ee73ff5fd24..c5c4c5cd3e7 100644
--- a/ydb/library/actors/queues/ut/ya.make
+++ b/ydb/library/actors/queues/ut/ya.make
@@ -6,7 +6,8 @@ IF (WITH_VALGRIND)
ENDIF()
SRCS(
- mpmc_ring_queue_ut.cpp
+ mpmc_ring_queue_ut_single_thread.cpp
+ mpmc_ring_queue_ut_multi_threads.cpp
)
END()
diff --git a/ydb/library/actors/queues/ya.make b/ydb/library/actors/queues/ya.make
index 621607abe22..e7dee4eca57 100644
--- a/ydb/library/actors/queues/ya.make
+++ b/ydb/library/actors/queues/ya.make
@@ -2,7 +2,7 @@ LIBRARY()
SRCS(
activation_queue.h
- mpmc_ring_queue.h
+ mpmc_ring_queue.cpp
)
END()