aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-03-15 14:04:15 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-03-15 14:15:37 +0300
commit5961b09dcf420a5e2e533a6b0794419d5ae6e5bb (patch)
treec390d9f9675b9f19383a8f209056195a8adf0aca
parent6f92f911a79d34c7e180e4db91c7fcecc864c4f0 (diff)
downloadydb-5961b09dcf420a5e2e533a6b0794419d5ae6e5bb.tar.gz
YT-21277: Make suspendable action queue unit tests more deterministic
f391d03c0bbd0c69ab2f8ec9c4cf270c0dde187d
-rw-r--r--yt/yt/core/concurrency/single_queue_scheduler_thread.cpp3
-rw-r--r--yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp129
2 files changed, 86 insertions, 46 deletions
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
index df1a18964b..48525b477a 100644
--- a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp
@@ -81,10 +81,10 @@ TFuture<void> TSuspendableSingleQueueSchedulerThread<TQueueImpl>::Suspend(bool i
template <class TQueueImpl>
void TSuspendableSingleQueueSchedulerThread<TQueueImpl>::Resume()
{
- YT_VERIFY(Suspending_);
YT_VERIFY(SuspendedPromise_.IsSet());
auto guard = Guard(Lock_);
+ YT_VERIFY(Suspending_);
Suspending_ = false;
SuspendImmediately_ = false;
@@ -118,7 +118,6 @@ TClosure TSuspendableSingleQueueSchedulerThread<TQueueImpl>::BeginExecute()
SuspendedPromise_.Set();
resumeEvent = ResumeEvent_;
}
-
resumeEvent->Wait();
}
diff --git a/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp b/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp
index 58aac2557e..47269a7523 100644
--- a/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp
@@ -31,6 +31,47 @@ protected:
TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15)));
}
+
+ void EnsureRunning() {
+ WaitFor(BIND([] {}).AsyncVia(Invoker_).Run()).ThrowOnError();
+ }
+
+ struct TLooper
+ {
+ TClosure Callback;
+ IInvokerPtr Invoker;
+ TPromise<void> StopPromise;
+ std::unique_ptr<std::atomic<bool>> StopRespawning = std::make_unique<std::atomic<bool>>(false);
+
+ TClosure MakeCallback() const
+ {
+ return BIND([this] () mutable {
+ Run();
+ });
+ }
+
+ void Start() {
+ Invoker->Invoke(MakeCallback());
+ }
+
+ TFuture<void> Stop() {
+ StopPromise = NewPromise<void>();
+ StopRespawning->store(true);
+ return StopPromise.ToFuture();
+ }
+
+ void Run() const
+ {
+ if (StopRespawning->load()) {
+ StopPromise.Set();
+ return;
+ }
+ if (Callback) {
+ Callback();
+ }
+ Invoker->Invoke(MakeCallback());
+ }
+ };
};
TEST_F(TSuspendableActionQueueTest, Simple)
@@ -48,37 +89,37 @@ TEST_F(TSuspendableActionQueueTest, Simple)
TEST_F(TSuspendableActionQueueTest, SuspendResume)
{
std::atomic<i64> x = 0;
- auto future = BIND([&x] {
- while (true) {
- ++x;
- Yield();
- }
- })
- .AsyncVia(Invoker_)
- .Run();
+ TLooper looper{
+ .Callback = BIND([&x] {
+ x.fetch_add(1);
+ }),
+ .Invoker = Invoker_,
+ };
- TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15)));
+ looper.Start();
+
+ EnsureRunning();
Queue_->Suspend(/*immediate*/ true)
.Get()
.ThrowOnError();
- i64 x1 = x;
+ i64 x1 = x.load();
EXPECT_GT(x1, 0);
- TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15)));
+ RandomSleep();
- i64 x2 = x;
+ i64 x2 = x.load();
EXPECT_EQ(x2, x1);
Queue_->Resume();
- TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15)));
+ EnsureRunning();
- i64 x3 = x;
+ i64 x3 = x.load();
EXPECT_GT(x3, x2);
- future.Cancel(TError("Test ended"));
+ looper.Stop().Get().ThrowOnError();
}
TEST_F(TSuspendableActionQueueTest, SuspendEmptyQueue)
@@ -105,60 +146,60 @@ TEST_F(TSuspendableActionQueueTest, SuspendEmptyQueue)
TEST_F(TSuspendableActionQueueTest, NotImmediateSuspend)
{
- std::atomic<i64> x = 0;
- Invoker_->Invoke(BIND([&x] {
- for (int iteration = 0; iteration < 50; ++iteration) {
- ++x;
+ std::atomic<i64> progress{0};
+ TLooper looper{
+ .Callback = BIND([&progress] {
+ progress.fetch_add(1);
+ }),
+ .Invoker = Invoker_,
+ };
- Sleep(TDuration::MilliSeconds(10));
+ looper.Start();
- Yield();
- }
- }));
-
- auto future = Queue_->Suspend(/*immedidate*/ false);
+ auto current_progress = progress.load();
+ auto future = Queue_->Suspend(/*immediately*/ false);
- TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+ EnsureRunning();
EXPECT_FALSE(future.IsSet());
- future
- .Get()
- .ThrowOnError();
+ looper.Stop().Get().ThrowOnError();
+
+ future.Get().ThrowOnError();
- EXPECT_EQ(x, 50);
+ EXPECT_GE(progress.load(), current_progress);
Queue_->Resume();
}
TEST_F(TSuspendableActionQueueTest, PromoteSuspendToImmediate)
{
- i64 x = 0;
- auto future = BIND([&x] {
- while (true) {
- ++x;
- Yield();
- }
- })
- .AsyncVia(Invoker_)
- .Run();
-
+ std::atomic<i64> progress{0};
+ TLooper looper{
+ .Callback = BIND([&progress] {
+ progress.fetch_add(1);
+ }),
+ .Invoker = Invoker_,
+ };
+ looper.Start();
+
+ auto current_progress = progress.load();
auto suspendFuture = Queue_->Suspend(/*immedidate*/ false);
- TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+ EnsureRunning();
EXPECT_FALSE(suspendFuture.IsSet());
+ EXPECT_GE(progress.load(), current_progress);
Queue_->Suspend(/*immediately*/ true)
.Get()
.ThrowOnError();
EXPECT_TRUE(suspendFuture.IsSet());
- EXPECT_GT(x, 0);
+ auto stopFuture = looper.Stop();
Queue_->Resume();
-
- future.Cancel(TError("Test ended"));
+ stopFuture.Get().ThrowOnError();
}
TEST_F(TSuspendableActionQueueTest, StressTest1)