diff options
| author | kruall <[email protected]> | 2024-04-10 16:58:00 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-04-10 16:58:00 +0300 |
| commit | b0ef85ebd550677bc732c050f0aeba245cfda17e (patch) | |
| tree | c536164f5fc71cb17f0a46420d196a4538c246c5 | |
| parent | a19b5f1a1be3a3102498effdee8f0e3e2f9c81d5 (diff) | |
Add basic multithread tests fot MPMCRingQueue (#3631)
| -rw-r--r-- | ydb/library/actors/queues/mpmc_ring_queue.cpp | 3 | ||||
| -rw-r--r-- | ydb/library/actors/queues/mpmc_ring_queue.h | 97 | ||||
| -rw-r--r-- | ydb/library/actors/queues/mpmc_ring_queue_ut_base.h | 50 | ||||
| -rw-r--r-- | ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp | 317 | ||||
| -rw-r--r-- | ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp (renamed from ydb/library/actors/queues/mpmc_ring_queue_ut.cpp) | 87 | ||||
| -rw-r--r-- | ydb/library/actors/queues/ut/ya.make | 3 | ||||
| -rw-r--r-- | ydb/library/actors/queues/ya.make | 2 |
7 files changed, 470 insertions, 89 deletions
diff --git a/ydb/library/actors/queues/mpmc_ring_queue.cpp b/ydb/library/actors/queues/mpmc_ring_queue.cpp new file mode 100644 index 00000000000..df186ee66e7 --- /dev/null +++ b/ydb/library/actors/queues/mpmc_ring_queue.cpp @@ -0,0 +1,3 @@ +#include "mpmc_ring_queue.h" + +thread_local NActors::TMPMCRingQueueStats::TStats NActors::TMPMCRingQueueStats::Stats; diff --git a/ydb/library/actors/queues/mpmc_ring_queue.h b/ydb/library/actors/queues/mpmc_ring_queue.h index 49466478698..45bc4950fd8 100644 --- a/ydb/library/actors/queues/mpmc_ring_queue.h +++ b/ydb/library/actors/queues/mpmc_ring_queue.h @@ -2,12 +2,79 @@ #include "defs.h" #include <library/cpp/threading/chunk_queue/queue.h> +#include <util/system/mutex.h> #include <atomic> #include <optional> namespace NActors { +struct TMPMCRingQueueStats { + + struct TStats { + ui64 SuccessPushes = 0; + ui64 SuccessSlowPushes = 0; + ui64 SuccessFastPushes = 0; + ui64 FailedPushes = 0; + ui64 FailedSlowPushAttempts = 0; + + ui64 ChangesFastPushToSlowPush = 0; + + TStats& operator += (const TStats &other) { + SuccessPushes += other.SuccessPushes; + SuccessSlowPushes += other.SuccessSlowPushes; + SuccessFastPushes += other.SuccessFastPushes; + ChangesFastPushToSlowPush += other.ChangesFastPushToSlowPush; + FailedPushes += other.FailedPushes; + FailedSlowPushAttempts += other.FailedSlowPushAttempts; + return *this; + } + }; + static thread_local TStats Stats; + +#ifdef MPMC_RING_QUEUE_COLLECT_STATISTICS + static constexpr bool CollectStatistics = true; +#else + static constexpr bool CollectStatistics = false; +#endif + + static void IncrementSuccessSlowPushes() { + if constexpr (CollectStatistics) { + Stats.SuccessSlowPushes++; + Stats.SuccessPushes++; + } + } + + static void IncrementSuccessFastPushes() { + if constexpr (CollectStatistics) { + Stats.SuccessFastPushes++; + Stats.SuccessPushes++; + } + } + + static void IncrementChangesFastPushToSlowPush() { + if constexpr (CollectStatistics) { + Stats.ChangesFastPushToSlowPush++; + } + } + + static void IncrementFailedPushes() { + if constexpr (CollectStatistics) { + Stats.FailedPushes++; + } + } + + static void IncrementFailedSlowPushAttempts() { + if constexpr (CollectStatistics) { + Stats.FailedSlowPushAttempts++; + } + } + + static TStats GetLocalStats() { + return Stats; + } +}; + template <ui32 MaxSizeBits> struct TMPMCRingQueue { static constexpr ui32 MaxSize = 1 << MaxSizeBits; @@ -64,8 +131,8 @@ struct TMPMCRingQueue { } bool TryPushSlow(ui32 val) { + ui64 currentTail = Tail.load(std::memory_order_relaxed); for (;;) { - ui64 currentTail = Tail.load(std::memory_order_acquire); ui32 generation = currentTail / MaxSize; std::atomic<ui64> ¤tSlot = Buffer[ConvertIdx(currentTail)]; @@ -74,6 +141,7 @@ struct TMPMCRingQueue { do { if (currentSlot.compare_exchange_weak(expected, val)) { Tail.compare_exchange_strong(currentTail, currentTail + 1); + TMPMCRingQueueStats::IncrementSuccessSlowPushes(); return true; } slot = TSlot::Recognise(expected); @@ -81,13 +149,14 @@ struct TMPMCRingQueue { if (!slot.IsEmpty) { ui64 currentHead = Head.load(std::memory_order_acquire); - if (currentHead + MaxSize <= currentTail + std::min<ui64>(1024, MaxSize - 1)) { + if (currentHead + MaxSize <= currentTail + std::min<ui64>(64, MaxSize - 1)) { return false; } } - Tail.compare_exchange_strong(currentTail, currentTail + 1); SpinLockPause(); + TMPMCRingQueueStats::IncrementFailedSlowPushAttempts(); + Tail.compare_exchange_strong(currentTail, currentTail + 1, std::memory_order_acq_rel); } } @@ -100,26 +169,16 @@ struct TMPMCRingQueue { ui64 expected = TSlot::MakeEmpty(generation); do { if (currentSlot.compare_exchange_weak(expected, val)) { + TMPMCRingQueueStats::IncrementSuccessFastPushes(); return true; } slot = TSlot::Recognise(expected); } while (slot.Generation <= generation && slot.IsEmpty); - if (!slot.IsEmpty) { - ui64 nextTail = currentTail + 1; - for (;;) { - ui64 currentHead = Head.load(std::memory_order_acquire); - if (currentHead + MaxSize <= currentTail + std::min<ui64>(1024, MaxSize - 1)) { - if (Tail.compare_exchange_weak(nextTail, currentTail, std::memory_order_acq_rel)) { - return false; - } - if (nextTail > currentTail && nextTail - currentTail < 16) { - continue; - } - } - break; - } - } + // TODO(kruall): mesure it's impact in bechmark + TMPMCRingQueueStats::IncrementChangesFastPushToSlowPush(); + currentTail++; + Tail.compare_exchange_weak(currentTail, currentTail - 1, std::memory_order_relaxed); return TryPushSlow(val); } @@ -217,7 +276,7 @@ struct TMPMCRingQueue { if (!currentHead) { currentHead = Head.load(std::memory_order_acquire); } - for (ui32 it = 0; it < 3; ++it) { + for (;;) { ui32 generation = currentHead / MaxSize; std::atomic<ui64> ¤tSlot = Buffer[ConvertIdx(currentHead)]; diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h b/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h new file mode 100644 index 00000000000..f383285372a --- /dev/null +++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_base.h @@ -0,0 +1,50 @@ +#pragma once + +#include "mpmc_ring_queue.h" + + +namespace NActors::NTests { + + struct IQueue { + virtual ~IQueue() = default; + virtual bool TryPush(ui32 value) = 0; + virtual std::optional<ui32> TryPop() = 0; + }; + + template <ui32 SizeBits> + using TTryPush = bool (TMPMCRingQueue<SizeBits>::*)(ui32 value); + template <ui32 SizeBits> + using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits>::*)(); + + template <ui32 SizeBits, TTryPush<SizeBits> TryPushMethod, TTryPop<SizeBits> TryPopMethod> + struct TMPMCQueueBase : IQueue { + TMPMCRingQueue<SizeBits> *Queue; + + TMPMCQueueBase(TMPMCRingQueue<SizeBits> *queue) + : Queue(queue) + {} + + bool TryPush(ui32 value) final { + return (Queue->*TryPushMethod)(value); + } + std::optional<ui32> TryPop() final { + return (Queue->*TryPopMethod)(); + } + }; + + template <ui32 SizeBits> + using TVerySlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopReallySlow>; + + template <ui32 SizeBits> + using TSlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopSlow>; + + template <ui32 SizeBits> + using TFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopFast>; + + template <ui32 SizeBits> + using TVeryFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopReallyFast>; + + template <ui32 SizeBits> + using TSingleQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopSingleConsumer>; + +} diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp b/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp new file mode 100644 index 00000000000..9b49fd5a769 --- /dev/null +++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_multi_threads.cpp @@ -0,0 +1,317 @@ +#define MPMC_RING_QUEUE_COLLECT_STATISTICS + +#include "mpmc_ring_queue.h" +#include "mpmc_ring_queue_ut_base.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <memory> +#include <util/random/random.h> +#include <util/system/thread.h> + +#include <queue> + + +using namespace NActors; +using namespace NActors::NTests; + +namespace { // Tests + + enum class EThreadAction { + Continue, + Sleep, + Kill, + }; + + struct TThreadAction { + EThreadAction Action; + ui64 SleepNs = 0; + }; + + enum class EWorkerAction { + Push, + Pop, + Sleep, + Kill, + }; + + enum class EExpectedStatus { + Nothing, + Success, + Failure, + RepeatUntilSuccess, + }; + + struct TWorkerAction { + EWorkerAction Action; + EExpectedStatus Expected = EExpectedStatus::Nothing; + std::optional<ui64> Value; + + explicit operator TThreadAction() const { + switch (Action) { + case EWorkerAction::Push: + case EWorkerAction::Pop: + return {.Action=EThreadAction::Continue}; + case EWorkerAction::Sleep: + UNIT_ASSERT(Value); + return {.Action=EThreadAction::Sleep, .SleepNs=*Value}; + case EWorkerAction::Kill: + return {.Action=EThreadAction::Kill}; + } + } + }; + + template <typename TQueue> + class TSimpleWorker { + public: + TSimpleWorker(TQueue *queue, const std::vector<TWorkerAction> actions, ui32 repeatCount = 1) + : Queue(queue) + , Actions(actions) + , RepeatCount(repeatCount) + { + UNIT_ASSERT(actions.size()); + } + + TThreadAction Do() { + if (Idx == Actions.size()) { + Idx = 0; + if (--RepeatCount == 0) { + return {.Action=EThreadAction::Kill}; + } + } + TWorkerAction &action = Actions[Idx++]; + switch (action.Action) { + case EWorkerAction::Push: { + UNIT_ASSERT(action.Value); + bool success = Queue->TryPush(*action.Value); + if (action.Expected == EExpectedStatus::RepeatUntilSuccess) { + while (!success) { + success = Queue->TryPush(*action.Value); + } + } else if (action.Expected != EExpectedStatus::Nothing) { + UNIT_ASSERT_VALUES_EQUAL(success, (action.Expected == EExpectedStatus::Success)); + } + } + break; + case EWorkerAction::Pop: { + auto value = Queue->TryPop(); + if (action.Expected == EExpectedStatus::RepeatUntilSuccess) { + while (!value) { + value = Queue->TryPop(); + } + } else if (action.Expected != EExpectedStatus::Nothing) { + UNIT_ASSERT_VALUES_EQUAL(bool(value), (action.Expected == EExpectedStatus::Success)); + if (value && action.Value) { + UNIT_ASSERT_VALUES_EQUAL(value, action.Value); + } + } + } + break; + default: + break; + } + return static_cast<TThreadAction>(action); + } + + private: + TQueue *Queue; + std::vector<TWorkerAction> Actions; + ui32 Idx = 0; + ui32 RepeatCount = 0; + }; + + struct TStatsCollector { + TMutex Mutex; + NActors::TMPMCRingQueueStats::TStats Stats; + + void AddStats(const NActors::TMPMCRingQueueStats::TStats &stats) { + TGuard<TMutex> guard(Mutex); + Stats += stats; + } + }; + + template <typename TWorker> + class TTestThread : public ISimpleThread { + public: + TTestThread(TWorker worker, TStatsCollector *statsCollector = nullptr) + : Worker(worker) + , StatsCollector(statsCollector) + {} + + ~TTestThread() = default; + + private: + bool Process(const TThreadAction &action) { + switch (action.Action) { + case EThreadAction::Continue: + break; + case EThreadAction::Sleep: + NanoSleep(action.SleepNs); + break; + case EThreadAction::Kill: + if (StatsCollector) { + auto stats = NActors::TMPMCRingQueueStats::GetLocalStats(); + // Cerr << (TStringBuilder() << "thread: " << (ui64)this << " pushes: " << stats.SuccessPushes.load() << Endl); + StatsCollector->AddStats(stats); + } + return true; + } + return false; + } + + void* ThreadProc() final { + for (;;) { + TThreadAction action = Worker.Do(); + if (Process(action)) { + break; + } + } + return nullptr; + } + + private: + TWorker Worker; + TStatsCollector *StatsCollector; + }; + + void RunThreads(const std::vector<ISimpleThread*> &threads) { + for (auto &thread : threads) { + thread->Start(); + } + for (auto &thread : threads) { + thread->Join(); + } + } + + void RunThreads(const std::vector<std::unique_ptr<ISimpleThread>> &threads) { + for (auto &thread : threads) { + thread->Start(); + } + for (auto &thread : threads) { + thread->Join(); + } + } + + template <typename ...TThreads> + void RunThreads(std::unique_ptr<TThreads>&& ...threads) { + RunThreads(std::vector<ISimpleThread*>{threads.release()...}); + } + + + template <ui32 SizeBits> + struct TTestCases { + + template <template <ui32> typename TQueueAdaptor> + static TStatsCollector BasicPushPopSingleThread() { + TMPMCRingQueue<SizeBits> realQueue; + TQueueAdaptor<SizeBits> adapter(&realQueue); + TStatsCollector collector; + TSimpleWorker<decltype(adapter)> worker( + &adapter, + { + TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1}, + TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=2}, + TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1}, + TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=2}, + } + ); + RunThreads(std::make_unique<TTestThread<decltype(worker)>>(worker, &collector)); + return std::move(collector); + } + + template <template <ui32> typename TQueueAdaptor, ui32 ThreadCount, ui32 RepeatCount> + static TStatsCollector BasicPushPopMultiThreads() { + TMPMCRingQueue<SizeBits> realQueue; + TVector<std::unique_ptr<IQueue>> adapters; + TVector<std::unique_ptr<ISimpleThread>> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { + TQueueAdaptor<SizeBits> *adapter = new TQueueAdaptor<SizeBits>(&realQueue); + adapters.emplace_back(adapter); + TSimpleWorker<std::decay_t<decltype(*adapter)>> worker( + adapter, + { + TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1}, + TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1}, + }, + RepeatCount + ); + threads.emplace_back(new TTestThread<decltype(worker)>(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + }; + +} + + +#define BASIC_PUSH_POP_SINGLE_THREAD_FAST(QUEUE) \ + Y_UNIT_TEST(BasicPushPopSingleThread_ ## QUEUE) { \ + TStatsCollector collector = TTestCases<10>::BasicPushPopSingleThread<QUEUE>(); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, 2); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 2); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedSlowPushAttempts, 0); \ + } \ +// end BASIC_PUSH_POP_SINGLE_THREAD + + +#define BASIC_PUSH_POP_SINGLE_THREAD_SLOW(QUEUE) \ + Y_UNIT_TEST(BasicPushPopSingleThread_ ## QUEUE) { \ + TStatsCollector collector = TTestCases<10>::BasicPushPopSingleThread<QUEUE>(); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, 2); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, 2); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedSlowPushAttempts, 0); \ + } \ +// end BASIC_PUSH_POP_SINGLE_THREAD + + +#define BASIC_PUSH_POP_MUTLI_THREADS_FAST(QUEUE) \ + Y_UNIT_TEST(BasicPushPopMultiThreads_ ## QUEUE) { \ + constexpr ui32 ThreadCount = 10; \ + constexpr ui32 RepeatCount = 1000; \ + TStatsCollector collector = TTestCases<10>::BasicPushPopMultiThreads<QUEUE, ThreadCount, RepeatCount>();\ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, RepeatCount * ThreadCount); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, collector.Stats.SuccessSlowPushes); \ + UNIT_ASSERT_VALUES_EQUAL( \ + collector.Stats.SuccessFastPushes + collector.Stats.SuccessSlowPushes, \ + collector.Stats.SuccessPushes \ + ); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \ + } \ +// end BASIC_PUSH_POP_MUTLI_THREADS_FAST + + +#define BASIC_PUSH_POP_MUTLI_THREADS_SLOW(QUEUE) \ + Y_UNIT_TEST(BasicPushPopMultiThreads_ ## QUEUE) { \ + constexpr ui32 ThreadCount = 10; \ + constexpr ui32 RepeatCount = 1000; \ + TStatsCollector collector = TTestCases<10>::BasicPushPopMultiThreads<QUEUE, ThreadCount, RepeatCount>();\ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessPushes, RepeatCount * ThreadCount); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.ChangesFastPushToSlowPush, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessFastPushes, 0); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.SuccessSlowPushes, collector.Stats.SuccessPushes); \ + UNIT_ASSERT_VALUES_EQUAL(collector.Stats.FailedPushes, 0); \ + } \ +// end BASIC_PUSH_POP_MUTLI_THREADS_SLOW + + +Y_UNIT_TEST_SUITE(MPMCRingQueueMultiThreadsTests) { + + BASIC_PUSH_POP_SINGLE_THREAD_FAST(TVeryFastQueue) + BASIC_PUSH_POP_SINGLE_THREAD_FAST(TFastQueue) + BASIC_PUSH_POP_SINGLE_THREAD_SLOW(TSlowQueue) + BASIC_PUSH_POP_SINGLE_THREAD_SLOW(TVerySlowQueue) + + BASIC_PUSH_POP_MUTLI_THREADS_FAST(TVeryFastQueue) + BASIC_PUSH_POP_MUTLI_THREADS_FAST(TFastQueue) + BASIC_PUSH_POP_MUTLI_THREADS_SLOW(TSlowQueue) + BASIC_PUSH_POP_MUTLI_THREADS_SLOW(TVerySlowQueue) + +} diff --git a/ydb/library/actors/queues/mpmc_ring_queue_ut.cpp b/ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp index 14807f96641..d7f20755a7b 100644 --- a/ydb/library/actors/queues/mpmc_ring_queue_ut.cpp +++ b/ydb/library/actors/queues/mpmc_ring_queue_ut_single_thread.cpp @@ -1,66 +1,17 @@ +#define MPMC_RING_QUEUE_COLLECT_STATISTICS + #include "mpmc_ring_queue.h" +#include "mpmc_ring_queue_ut_base.h" #include <library/cpp/testing/unittest/registar.h> #include <util/random/random.h> -#include <util/system/thread.h> #include <queue> -namespace { - using namespace NActors; - - class TThread : public ISimpleThread { - - TThread() = default; - - private: - void* ThreadProc() override; - }; - - struct IQueue { - virtual bool TryPush(ui32 value) = 0; - virtual std::optional<ui32> TryPop() = 0; - }; - - template <ui32 SizeBits> - using TTryPush = bool (TMPMCRingQueue<SizeBits>::*)(ui32 value); - template <ui32 SizeBits> - using TTryPop = std::optional<ui32> (TMPMCRingQueue<SizeBits>::*)(); - - template <ui32 SizeBits, TTryPush<SizeBits> TryPushMethod, TTryPop<SizeBits> TryPopMethod> - struct TMPMCQueueBase : IQueue { - TMPMCRingQueue<SizeBits> &Queue; - - TMPMCQueueBase(TMPMCRingQueue<SizeBits> &queue) - : Queue(queue) - {} - - bool TryPush(ui32 value) override { - return (Queue.*TryPushMethod)(value); - } - std::optional<ui32> TryPop() override { - return (Queue.*TryPopMethod)(); - } - }; - - template <ui32 SizeBits> - using TVerySlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopReallySlow>; - - template <ui32 SizeBits> - using TSlowQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPushSlow, &TMPMCRingQueue<SizeBits>::TryPopSlow>; - - template <ui32 SizeBits> - using TFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopFast>; - - template <ui32 SizeBits> - using TVeryFastQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopReallyFast>; - - template <ui32 SizeBits> - using TSingleQueue = TMPMCQueueBase<SizeBits, &TMPMCRingQueue<SizeBits>::TryPush, &TMPMCRingQueue<SizeBits>::TryPopSingleConsumer>; - -} +using namespace NActors; +using namespace NActors::NTests; namespace { // Tests @@ -94,7 +45,7 @@ namespace { // Tests static void PushesPopsWithShift() { TMPMCRingQueue<SizeBits> realQueue; - TQueueAdaptor<SizeBits> adaptor(realQueue); + TQueueAdaptor<SizeBits> adaptor(&realQueue); for (ui32 it = 0; it < MaxSize; ++it) { for (ui32 idx = 0; idx < MaxSize - 1; ++idx) { @@ -133,7 +84,7 @@ namespace { // Tests static void PushesOverloadPops() { TMPMCRingQueue<SizeBits> realQueue; - TQueueAdaptor<SizeBits> adaptor(realQueue); + TQueueAdaptor<SizeBits> adaptor(&realQueue); for (ui32 it = 0; it < MaxSize; ++it) { for (ui32 idx = 0; idx < MaxSize; ++idx) { @@ -172,7 +123,7 @@ namespace { // Tests return; } TMPMCRingQueue<SizeBits> realQueue; - TQueueAdaptor<SizeBits> adaptor(realQueue); + TQueueAdaptor<SizeBits> adaptor(&realQueue); ui64 emptyZeroGeneration = (ui64(1) << 63); @@ -206,7 +157,7 @@ namespace { // Tests static void CheckSlowPops() { TMPMCRingQueue<SizeBits> realQueue; - TQueueAdaptor<SizeBits> adaptor(realQueue); + TQueueAdaptor<SizeBits> adaptor(&realQueue); ui64 emptyZeroGeneration = (ui64(1) << 63); for (ui32 it = 0; it < MaxSize; ++it) { for (ui32 idx = 0; idx < MaxSize; ++idx) { @@ -224,7 +175,7 @@ namespace { // Tests static void CheckFastPops() { TMPMCRingQueue<SizeBits> realQueue; - TQueueAdaptor<SizeBits> adaptor(realQueue); + TQueueAdaptor<SizeBits> adaptor(&realQueue); for (ui32 it = 0; it < MaxSize; ++it) { for (ui32 idx = 0; idx < MaxSize; ++idx) { ui64 emptyCurrentGeneration = (ui64(1) << 63) + (ui64)it; @@ -304,7 +255,7 @@ namespace { // Tests constexpr ui32 SizeBits = 3; -Y_UNIT_TEST_SUITE(MPMCRingQueueTests) { +Y_UNIT_TEST_SUITE(MPMCRingQueueSingleThreadTests) { BASIC_QUEUE_TEST_CASES(SizeBits, TSingleQueue); BASIC_QUEUE_TEST_CASES(SizeBits, TVerySlowQueue); BASIC_QUEUE_TEST_CASES(SizeBits, TSlowQueue); @@ -322,8 +273,8 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) { TestRandomUsage( 10'000, (ui64(1) << SizeBits), - TVeryFastQueue<SizeBits>(realQueue), - TFastQueue<SizeBits>(realQueue) + TVeryFastQueue<SizeBits>(&realQueue), + TFastQueue<SizeBits>(&realQueue) ); } @@ -332,8 +283,8 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) { TestRandomUsage( 10'000, (ui64(1) << SizeBits), - TVerySlowQueue<SizeBits>(realQueue), - TSlowQueue<SizeBits>(realQueue) + TVerySlowQueue<SizeBits>(&realQueue), + TSlowQueue<SizeBits>(&realQueue) ); } @@ -342,10 +293,10 @@ Y_UNIT_TEST_SUITE(MPMCRingQueueTests) { TestRandomUsage( 100'000, (ui64(1) << SizeBits), - TVerySlowQueue<SizeBits>(realQueue), - TSlowQueue<SizeBits>(realQueue), - TFastQueue<SizeBits>(realQueue), - TVeryFastQueue<SizeBits>(realQueue) + TVerySlowQueue<SizeBits>(&realQueue), + TSlowQueue<SizeBits>(&realQueue), + TFastQueue<SizeBits>(&realQueue), + TVeryFastQueue<SizeBits>(&realQueue) ); } } diff --git a/ydb/library/actors/queues/ut/ya.make b/ydb/library/actors/queues/ut/ya.make index ee73ff5fd24..c5c4c5cd3e7 100644 --- a/ydb/library/actors/queues/ut/ya.make +++ b/ydb/library/actors/queues/ut/ya.make @@ -6,7 +6,8 @@ IF (WITH_VALGRIND) ENDIF() SRCS( - mpmc_ring_queue_ut.cpp + mpmc_ring_queue_ut_single_thread.cpp + mpmc_ring_queue_ut_multi_threads.cpp ) END() diff --git a/ydb/library/actors/queues/ya.make b/ydb/library/actors/queues/ya.make index 621607abe22..e7dee4eca57 100644 --- a/ydb/library/actors/queues/ya.make +++ b/ydb/library/actors/queues/ya.make @@ -2,7 +2,7 @@ LIBRARY() SRCS( activation_queue.h - mpmc_ring_queue.h + mpmc_ring_queue.cpp ) END() |
