aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-06 21:22:46 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-05-06 21:30:58 +0300
commitd37324cee12fd5289205d6c5a9486e6130b6a72b (patch)
treeccfbc9b7eaa80c9d2c9a8f122b755bcbdea018be
parent86fa90c3f199cb8af7bb07188a11da49f5e8e984 (diff)
downloadydb-d37324cee12fd5289205d6c5a9486e6130b6a72b.tar.gz
Add tests for mpsc intrusive stack
fb15b88d806292bb587924e17229fa859dd361c7
-rw-r--r--yt/yt/core/concurrency/fiber.cpp2
-rw-r--r--yt/yt/core/misc/intrusive_mpsc_stack-inl.h6
-rw-r--r--yt/yt/core/misc/intrusive_mpsc_stack.h4
-rw-r--r--yt/yt/core/misc/unittests/mpsc_stack_ut.cpp84
4 files changed, 90 insertions, 6 deletions
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp
index e257e4a2ef..0bff308d4c 100644
--- a/yt/yt/core/concurrency/fiber.cpp
+++ b/yt/yt/core/concurrency/fiber.cpp
@@ -119,7 +119,7 @@ private:
class TFiberRegistry
{
template <class Tag>
- using TFiberStack = TIntrusiveMPSCStack<TFiber, Tag>;
+ using TFiberStack = TIntrusiveMpscStack<TFiber, Tag>;
public:
//! Do not rename, change the signature, or drop Y_NO_INLINE.
diff --git a/yt/yt/core/misc/intrusive_mpsc_stack-inl.h b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h
index f294557942..03c97a12a7 100644
--- a/yt/yt/core/misc/intrusive_mpsc_stack-inl.h
+++ b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h
@@ -11,13 +11,13 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
template <class T, class Tag>
-TIntrusiveMPSCStack<T, Tag>::TIntrusiveMPSCStack() noexcept
+TIntrusiveMpscStack<T, Tag>::TIntrusiveMpscStack() noexcept
{
static_assert(std::derived_from<T, TIntrusiveListItem<T, Tag>>, "Class must inherit from CRTP-base TIntrusiveListItem");
}
template <class T, class Tag>
-void TIntrusiveMPSCStack<T, Tag>::Push(TNode* item) noexcept
+void TIntrusiveMpscStack<T, Tag>::Push(TNode* item) noexcept
{
// Past this line item is not a valid instance of TInstrusiveListItem.
@@ -33,7 +33,7 @@ void TIntrusiveMPSCStack<T, Tag>::Push(TNode* item) noexcept
}
template <class T, class Tag>
-TIntrusiveList<T, Tag> TIntrusiveMPSCStack<T, Tag>::PopAll() noexcept
+TIntrusiveList<T, Tag> TIntrusiveMpscStack<T, Tag>::PopAll() noexcept
{
TNode* head = Head_.exchange(nullptr, std::memory_order::acquire);
diff --git a/yt/yt/core/misc/intrusive_mpsc_stack.h b/yt/yt/core/misc/intrusive_mpsc_stack.h
index 0b54e13262..1e2f44781f 100644
--- a/yt/yt/core/misc/intrusive_mpsc_stack.h
+++ b/yt/yt/core/misc/intrusive_mpsc_stack.h
@@ -9,12 +9,12 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
template <class T, class Tag = TIntrusiveListDefaultTag>
-class TIntrusiveMPSCStack
+class TIntrusiveMpscStack
{
using TNode = TIntrusiveListItem<T, Tag>;
public:
- TIntrusiveMPSCStack() noexcept;
+ TIntrusiveMpscStack() noexcept;
void Push(TNode* item) noexcept;
diff --git a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
index 903e9e16c7..0cbaed1c90 100644
--- a/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
+++ b/yt/yt/core/misc/unittests/mpsc_stack_ut.cpp
@@ -1,5 +1,6 @@
#include <yt/yt/core/test_framework/framework.h>
+#include <yt/yt/core/misc/intrusive_mpsc_stack.h>
#include <yt/yt/core/misc/mpsc_stack.h>
#include <thread>
@@ -133,5 +134,88 @@ TEST(TMpscStackTest, DequeueFiltered)
////////////////////////////////////////////////////////////////////////////////
+template <class Tag>
+struct TWidget
+ : public TIntrusiveListItem<TWidget<Tag>, Tag>
+{
+ int Value;
+};
+
+struct Tag1
+{ };
+
+struct Tag2
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TIntrusiveMpscStackTest, Simple)
+{
+ TWidget<Tag1> one{.Value = 1};
+ TWidget<Tag1> two{.Value = 2};
+
+ TIntrusiveMpscStack<TWidget<Tag1>, Tag1> stack;
+
+ stack.Push(&one);
+ stack.Push(&two);
+
+ auto list = stack.PopAll();
+
+ EXPECT_EQ(list.Size(), 2u);
+
+ auto twoPtr = list.PopBack();
+ auto onePtr = list.PopBack();
+
+ EXPECT_EQ(onePtr, &one);
+ EXPECT_EQ(twoPtr, &two);
+
+ stack.Push(twoPtr);
+
+ list = stack.PopAll();
+ EXPECT_EQ(list.Size(), 1u);
+
+ onePtr = list.PopBack();
+ EXPECT_EQ(onePtr, twoPtr);
+}
+
+TEST(TIntrusiveMpscStackTest, ConcurrentTryDequeue)
+{
+ constexpr i64 Size = 100'000;
+
+ TIntrusiveMpscStack<TWidget<Tag1>, Tag1> stack;
+
+ auto run = [&] {
+ Sleep(TDuration::MilliSeconds(50));
+ for (int i = 0; i < Size; ++i) {
+ stack.Push(new TWidget<Tag1>(TWidget<Tag1>{
+ .Value = i,
+ }));
+ }
+ };
+
+ std::thread t1(run);
+ std::thread t2(run);
+
+ i64 sum = 0;
+ i64 expectedSum = Size * (Size - 1);
+
+ while (sum < expectedSum) {
+ auto list = stack.PopAll();
+
+ while (!list.Empty()) {
+ auto item = list.PopBack();
+ sum += item->Value;
+ delete item;
+ }
+ }
+
+ EXPECT_EQ(sum, expectedSum);
+
+ t1.join();
+ t2.join();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT