diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-03-15 14:04:15 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-03-15 14:15:37 +0300 |
commit | 5961b09dcf420a5e2e533a6b0794419d5ae6e5bb (patch) | |
tree | c390d9f9675b9f19383a8f209056195a8adf0aca | |
parent | 6f92f911a79d34c7e180e4db91c7fcecc864c4f0 (diff) | |
download | ydb-5961b09dcf420a5e2e533a6b0794419d5ae6e5bb.tar.gz |
YT-21277: Make suspendable action queue unit tests more deterministic
f391d03c0bbd0c69ab2f8ec9c4cf270c0dde187d
-rw-r--r-- | yt/yt/core/concurrency/single_queue_scheduler_thread.cpp | 3 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp | 129 |
2 files changed, 86 insertions, 46 deletions
diff --git a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp index df1a18964b..48525b477a 100644 --- a/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/single_queue_scheduler_thread.cpp @@ -81,10 +81,10 @@ TFuture<void> TSuspendableSingleQueueSchedulerThread<TQueueImpl>::Suspend(bool i template <class TQueueImpl> void TSuspendableSingleQueueSchedulerThread<TQueueImpl>::Resume() { - YT_VERIFY(Suspending_); YT_VERIFY(SuspendedPromise_.IsSet()); auto guard = Guard(Lock_); + YT_VERIFY(Suspending_); Suspending_ = false; SuspendImmediately_ = false; @@ -118,7 +118,6 @@ TClosure TSuspendableSingleQueueSchedulerThread<TQueueImpl>::BeginExecute() SuspendedPromise_.Set(); resumeEvent = ResumeEvent_; } - resumeEvent->Wait(); } diff --git a/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp b/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp index 58aac2557e..47269a7523 100644 --- a/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp +++ b/yt/yt/core/concurrency/unittests/suspendable_action_queue_ut.cpp @@ -31,6 +31,47 @@ protected: TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15))); } + + void EnsureRunning() { + WaitFor(BIND([] {}).AsyncVia(Invoker_).Run()).ThrowOnError(); + } + + struct TLooper + { + TClosure Callback; + IInvokerPtr Invoker; + TPromise<void> StopPromise; + std::unique_ptr<std::atomic<bool>> StopRespawning = std::make_unique<std::atomic<bool>>(false); + + TClosure MakeCallback() const + { + return BIND([this] () mutable { + Run(); + }); + } + + void Start() { + Invoker->Invoke(MakeCallback()); + } + + TFuture<void> Stop() { + StopPromise = NewPromise<void>(); + StopRespawning->store(true); + return StopPromise.ToFuture(); + } + + void Run() const + { + if (StopRespawning->load()) { + StopPromise.Set(); + return; + } + if (Callback) { + Callback(); + } + Invoker->Invoke(MakeCallback()); + } + }; }; TEST_F(TSuspendableActionQueueTest, Simple) @@ -48,37 +89,37 @@ TEST_F(TSuspendableActionQueueTest, Simple) TEST_F(TSuspendableActionQueueTest, SuspendResume) { std::atomic<i64> x = 0; - auto future = BIND([&x] { - while (true) { - ++x; - Yield(); - } - }) - .AsyncVia(Invoker_) - .Run(); + TLooper looper{ + .Callback = BIND([&x] { + x.fetch_add(1); + }), + .Invoker = Invoker_, + }; - TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15))); + looper.Start(); + + EnsureRunning(); Queue_->Suspend(/*immediate*/ true) .Get() .ThrowOnError(); - i64 x1 = x; + i64 x1 = x.load(); EXPECT_GT(x1, 0); - TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15))); + RandomSleep(); - i64 x2 = x; + i64 x2 = x.load(); EXPECT_EQ(x2, x1); Queue_->Resume(); - TDelayedExecutor::WaitForDuration(RandomDuration(TDuration::MilliSeconds(15))); + EnsureRunning(); - i64 x3 = x; + i64 x3 = x.load(); EXPECT_GT(x3, x2); - future.Cancel(TError("Test ended")); + looper.Stop().Get().ThrowOnError(); } TEST_F(TSuspendableActionQueueTest, SuspendEmptyQueue) @@ -105,60 +146,60 @@ TEST_F(TSuspendableActionQueueTest, SuspendEmptyQueue) TEST_F(TSuspendableActionQueueTest, NotImmediateSuspend) { - std::atomic<i64> x = 0; - Invoker_->Invoke(BIND([&x] { - for (int iteration = 0; iteration < 50; ++iteration) { - ++x; + std::atomic<i64> progress{0}; + TLooper looper{ + .Callback = BIND([&progress] { + progress.fetch_add(1); + }), + .Invoker = Invoker_, + }; - Sleep(TDuration::MilliSeconds(10)); + looper.Start(); - Yield(); - } - })); - - auto future = Queue_->Suspend(/*immedidate*/ false); + auto current_progress = progress.load(); + auto future = Queue_->Suspend(/*immediately*/ false); - TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + EnsureRunning(); EXPECT_FALSE(future.IsSet()); - future - .Get() - .ThrowOnError(); + looper.Stop().Get().ThrowOnError(); + + future.Get().ThrowOnError(); - EXPECT_EQ(x, 50); + EXPECT_GE(progress.load(), current_progress); Queue_->Resume(); } TEST_F(TSuspendableActionQueueTest, PromoteSuspendToImmediate) { - i64 x = 0; - auto future = BIND([&x] { - while (true) { - ++x; - Yield(); - } - }) - .AsyncVia(Invoker_) - .Run(); - + std::atomic<i64> progress{0}; + TLooper looper{ + .Callback = BIND([&progress] { + progress.fetch_add(1); + }), + .Invoker = Invoker_, + }; + looper.Start(); + + auto current_progress = progress.load(); auto suspendFuture = Queue_->Suspend(/*immedidate*/ false); - TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + EnsureRunning(); EXPECT_FALSE(suspendFuture.IsSet()); + EXPECT_GE(progress.load(), current_progress); Queue_->Suspend(/*immediately*/ true) .Get() .ThrowOnError(); EXPECT_TRUE(suspendFuture.IsSet()); - EXPECT_GT(x, 0); + auto stopFuture = looper.Stop(); Queue_->Resume(); - - future.Cancel(TError("Test ended")); + stopFuture.Get().ThrowOnError(); } TEST_F(TSuspendableActionQueueTest, StressTest1) |