diff options
author | pavook <pavook@yandex-team.com> | 2025-02-24 20:40:01 +0300 |
---|---|---|
committer | pavook <pavook@yandex-team.com> | 2025-02-24 21:58:25 +0300 |
commit | 460453e4b8c826967427a69464b30097a36dd2c2 (patch) | |
tree | 56064cc1fd3bc8084a43dd09ade764798590b80e | |
parent | c020841349085b07e6b43c80346861490de13a63 (diff) | |
download | ydb-460453e4b8c826967427a69464b30097a36dd2c2.tar.gz |
Add TPeriodicExecutor::StartAndGetFirstExecutedEvent
commit_hash:b28f770f6a84bfe4eee590c6cd3cfba4df933928
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor_base-inl.h | 30 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor_base.h | 7 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/periodic_ut.cpp | 116 |
3 files changed, 143 insertions, 10 deletions
diff --git a/yt/yt/core/concurrency/periodic_executor_base-inl.h b/yt/yt/core/concurrency/periodic_executor_base-inl.h index 4133497c2f..022dadf503 100644 --- a/yt/yt/core/concurrency/periodic_executor_base-inl.h +++ b/yt/yt/core/concurrency/periodic_executor_base-inl.h @@ -25,18 +25,25 @@ TPeriodicExecutorBase<TInvocationTimePolicy>::TPeriodicExecutorBase( template <CInvocationTimePolicy TInvocationTimePolicy> void TPeriodicExecutorBase<TInvocationTimePolicy>::Start() { + YT_UNUSED_FUTURE(StartAndGetFirstExecutedEvent()); +} + +template <CInvocationTimePolicy TInvocationTimePolicy> +TFuture<void> TPeriodicExecutorBase<TInvocationTimePolicy>::StartAndGetFirstExecutedEvent() +{ auto guard = Guard(SpinLock_); - if (Started_) { - return; + if (!Started_) { + FirstExecutedEventPromise_ = NewPromise<void>(); + ExecutedPromise_ = TPromise<void>(); + IdlePromise_ = TPromise<void>(); + Started_ = true; + if (TInvocationTimePolicy::IsEnabled()) { + PostDelayedCallback(TInvocationTimePolicy::KickstartDeadline()); + } } - ExecutedPromise_ = TPromise<void>(); - IdlePromise_ = TPromise<void>(); - Started_ = true; - if (TInvocationTimePolicy::IsEnabled()) { - PostDelayedCallback(TInvocationTimePolicy::KickstartDeadline()); - } + return FirstExecutedEventPromise_.ToFuture().ToUncancelable(); } template <CInvocationTimePolicy TInvocationTimePolicy> @@ -58,6 +65,7 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSp TInvocationTimePolicy::Reset(); auto executedPromise = ExecutedPromise_; + auto firstExecutedEventPromise = FirstExecutedEventPromise_; auto executionCanceler = ExecutionCanceler_; TDelayedExecutor::CancelAndClear(Cookie_); @@ -67,6 +75,8 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSp executedPromise.TrySet(MakeStoppedError()); } + firstExecutedEventPromise.TrySet(MakeStoppedError()); + if (executionCanceler) { executionCanceler(MakeStoppedError()); } @@ -207,9 +217,11 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::RunCallback() } TPromise<void> idlePromise; + TPromise<void> firstExecutedEventPromise; { auto guard = Guard(SpinLock_); idlePromise = IdlePromise_; + firstExecutedEventPromise = FirstExecutedEventPromise_; ExecutingCallback_ = false; ExecutionCanceler_.Reset(); } @@ -222,6 +234,8 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::RunCallback() executedPromise.TrySet(); } + firstExecutedEventPromise.TrySet(); + auto guard = Guard(SpinLock_); YT_VERIFY(Busy_); diff --git a/yt/yt/core/concurrency/periodic_executor_base.h b/yt/yt/core/concurrency/periodic_executor_base.h index ddf6a853e8..32ce9522e8 100644 --- a/yt/yt/core/concurrency/periodic_executor_base.h +++ b/yt/yt/core/concurrency/periodic_executor_base.h @@ -88,6 +88,12 @@ public: //! Starts the instance. void Start(); + //! Starts the instance. Returns a future that becomes set when the first callback + //! invocation since the last stop finishes. + //! If the call arrives to an already started executor, the future stays the same + //! and still corresponds to the first invocation. + TFuture<void> StartAndGetFirstExecutedEvent(); + bool IsStarted() const; //! Stops the instance, cancels all subsequent invocations. @@ -134,6 +140,7 @@ private: TDelayedExecutorCookie Cookie_; TPromise<void> IdlePromise_; TPromise<void> ExecutedPromise_; + TPromise<void> FirstExecutedEventPromise_; void DoStop(TGuard<NThreading::TSpinLock>& guard); diff --git a/yt/yt/core/concurrency/unittests/periodic_ut.cpp b/yt/yt/core/concurrency/unittests/periodic_ut.cpp index fb452bfc6e..ce60ff52e0 100644 --- a/yt/yt/core/concurrency/unittests/periodic_ut.cpp +++ b/yt/yt/core/concurrency/unittests/periodic_ut.cpp @@ -7,6 +7,7 @@ #include <yt/yt/core/concurrency/delayed_executor.h> #include <yt/yt/core/concurrency/periodic_executor.h> #include <yt/yt/core/concurrency/scheduler.h> +#include <yt/yt/core/concurrency/thread_pool.h> #include <yt/yt/core/misc/lazy_ptr.h> @@ -18,6 +19,11 @@ namespace NYT::NConcurrency { namespace { +using ::testing::Each; +using ::testing::IsFalse; +using ::testing::IsTrue; +using ::testing::Property; + //////////////////////////////////////////////////////////////////////////////// class TPeriodicTest @@ -41,8 +47,9 @@ TEST_W(TPeriodicTest, Simple) callback, TDuration::MilliSeconds(100)); - executor->Start(); + auto firstExecution = executor->StartAndGetFirstExecutedEvent(); TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(600)); + EXPECT_TRUE(firstExecution.IsSet()); WaitFor(executor->Stop()) .ThrowOnError(); EXPECT_EQ(2, count.load()); @@ -60,6 +67,13 @@ TEST_W(TPeriodicTest, Simple) EXPECT_EQ(6, count.load()); WaitFor(executor->Stop()) .ThrowOnError(); + EXPECT_EQ(6, count.load()); + + WaitFor(executor->StartAndGetFirstExecutedEvent()) + .ThrowOnError(); + EXPECT_EQ(7, count.load()); + WaitFor(executor->Stop()) + .ThrowOnError(); } TEST_W(TPeriodicTest, SimpleScheduleOutOfBand) @@ -90,6 +104,66 @@ TEST_W(TPeriodicTest, SimpleScheduleOutOfBand) EXPECT_EQ(2, count.load()); } +TEST_W(TPeriodicTest, ParallelStart) +{ + static constexpr int ThreadCount = 4; + + TPromise<void> threadStartBarrier = NewPromise<void>(); + TPromise<void> callbackStartBarrier = NewPromise<void>(); + TPromise<void> callbackEndBarrier = NewPromise<void>(); + std::atomic<int> countStarted = {0}; + std::atomic<int> countFinished = {0}; + std::atomic<int> countWaiting = {0}; + + auto callback = BIND([&] { + ++countStarted; + threadStartBarrier.ToFuture().Get(); + callbackStartBarrier.Set(); + callbackEndBarrier.ToFuture().Get(); + ++countFinished; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TPeriodicExecutor>( + actionQueue->GetInvoker(), + callback, + TDuration::MilliSeconds(200)); + + auto startCallback = BIND([&] { + auto result = executor->StartAndGetFirstExecutedEvent(); + if (++countWaiting == ThreadCount) { + threadStartBarrier.Set(); + } + return result; + }); + + auto threadPool = CreateThreadPool(ThreadCount, "test"); + + std::vector<TFuture<void>> futures; + for (int i = 0; i < ThreadCount; ++i) { + futures.push_back(startCallback.AsyncVia(threadPool->GetInvoker()).Run()); + } + + // Check that start futures are set correctly in all threads + // after the first execution, but before the second one. + + callbackStartBarrier.ToFuture().Get(); + EXPECT_THAT( + futures, + Each( + Property(&TFuture<void>::IsSet, IsFalse()))); + callbackEndBarrier.Set(); + threadStartBarrier = NewPromise<void>(); + WaitFor(AllSucceeded(futures)) + .ThrowOnError(); + EXPECT_EQ(1, countStarted.load()); + EXPECT_EQ(1, countFinished.load()); + threadStartBarrier.Set(); + + WaitFor(executor->Stop()) + .ThrowOnError(); +} + TEST_W(TPeriodicTest, ParallelStop) { std::atomic<int> count = {0}; @@ -233,6 +307,34 @@ TEST_W(TPeriodicTest, OnExecutedEventCanceled) EXPECT_EQ(2, count.load()); } +TEST_W(TPeriodicTest, OnStartCancelled) +{ + auto callbackStarted = NewPromise<void>(); + + auto callback = BIND([&] { + callbackStarted.Set(); + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TPeriodicExecutor>( + actionQueue->GetInvoker(), + callback, + TDuration::MilliSeconds(200)); + + auto startFuture1 = executor->StartAndGetFirstExecutedEvent(); + auto startFuture2 = executor->StartAndGetFirstExecutedEvent(); + startFuture1.Cancel(TError(NYT::EErrorCode::Canceled, "Canceled")); + + // NB(pavook): cancellation of a start future shouldn't cause an executor stop + // and should not propagate to the underlying promise (and other futures). + EXPECT_TRUE(callbackStarted.ToFuture().Get().IsOK()); + EXPECT_TRUE(executor->IsStarted()); + EXPECT_TRUE(startFuture2.IsSet()); + EXPECT_TRUE(startFuture2.Get().IsOK()); + WaitFor(executor->Stop()) + .ThrowOnError(); +} + TEST_W(TPeriodicTest, Stop) { auto neverSetPromise = NewPromise<void>(); @@ -248,7 +350,7 @@ TEST_W(TPeriodicTest, Stop) callback, TDuration::MilliSeconds(100)); - executor->Start(); + auto startFuture = executor->StartAndGetFirstExecutedEvent(); // Wait for the callback to enter WaitFor. Sleep(TDuration::MilliSeconds(100)); WaitFor(executor->Stop()) @@ -256,6 +358,16 @@ TEST_W(TPeriodicTest, Stop) EXPECT_TRUE(immediatelyCancelableFuture.IsSet()); EXPECT_EQ(NYT::EErrorCode::Canceled, immediatelyCancelableFuture.Get().GetCode()); + EXPECT_FALSE(startFuture.Get().IsOK()); + EXPECT_EQ(NYT::EErrorCode::Canceled, startFuture.Get().GetCode()); + + startFuture = executor->StartAndGetFirstExecutedEvent(); + Sleep(TDuration::MilliSeconds(200)); + WaitFor(executor->Stop()) + .ThrowOnError(); + // startFuture should be set after the first execution. + EXPECT_TRUE(startFuture.IsSet()); + EXPECT_TRUE(startFuture.Get().IsOK()); } //////////////////////////////////////////////////////////////////////////////// |