aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpavook <pavook@yandex-team.com>2025-02-24 20:40:01 +0300
committerpavook <pavook@yandex-team.com>2025-02-24 21:58:25 +0300
commit460453e4b8c826967427a69464b30097a36dd2c2 (patch)
tree56064cc1fd3bc8084a43dd09ade764798590b80e
parentc020841349085b07e6b43c80346861490de13a63 (diff)
downloadydb-460453e4b8c826967427a69464b30097a36dd2c2.tar.gz
Add TPeriodicExecutor::StartAndGetFirstExecutedEvent
commit_hash:b28f770f6a84bfe4eee590c6cd3cfba4df933928
-rw-r--r--yt/yt/core/concurrency/periodic_executor_base-inl.h30
-rw-r--r--yt/yt/core/concurrency/periodic_executor_base.h7
-rw-r--r--yt/yt/core/concurrency/unittests/periodic_ut.cpp116
3 files changed, 143 insertions, 10 deletions
diff --git a/yt/yt/core/concurrency/periodic_executor_base-inl.h b/yt/yt/core/concurrency/periodic_executor_base-inl.h
index 4133497c2f..022dadf503 100644
--- a/yt/yt/core/concurrency/periodic_executor_base-inl.h
+++ b/yt/yt/core/concurrency/periodic_executor_base-inl.h
@@ -25,18 +25,25 @@ TPeriodicExecutorBase<TInvocationTimePolicy>::TPeriodicExecutorBase(
template <CInvocationTimePolicy TInvocationTimePolicy>
void TPeriodicExecutorBase<TInvocationTimePolicy>::Start()
{
+ YT_UNUSED_FUTURE(StartAndGetFirstExecutedEvent());
+}
+
+template <CInvocationTimePolicy TInvocationTimePolicy>
+TFuture<void> TPeriodicExecutorBase<TInvocationTimePolicy>::StartAndGetFirstExecutedEvent()
+{
auto guard = Guard(SpinLock_);
- if (Started_) {
- return;
+ if (!Started_) {
+ FirstExecutedEventPromise_ = NewPromise<void>();
+ ExecutedPromise_ = TPromise<void>();
+ IdlePromise_ = TPromise<void>();
+ Started_ = true;
+ if (TInvocationTimePolicy::IsEnabled()) {
+ PostDelayedCallback(TInvocationTimePolicy::KickstartDeadline());
+ }
}
- ExecutedPromise_ = TPromise<void>();
- IdlePromise_ = TPromise<void>();
- Started_ = true;
- if (TInvocationTimePolicy::IsEnabled()) {
- PostDelayedCallback(TInvocationTimePolicy::KickstartDeadline());
- }
+ return FirstExecutedEventPromise_.ToFuture().ToUncancelable();
}
template <CInvocationTimePolicy TInvocationTimePolicy>
@@ -58,6 +65,7 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSp
TInvocationTimePolicy::Reset();
auto executedPromise = ExecutedPromise_;
+ auto firstExecutedEventPromise = FirstExecutedEventPromise_;
auto executionCanceler = ExecutionCanceler_;
TDelayedExecutor::CancelAndClear(Cookie_);
@@ -67,6 +75,8 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::DoStop(TGuard<NThreading::TSp
executedPromise.TrySet(MakeStoppedError());
}
+ firstExecutedEventPromise.TrySet(MakeStoppedError());
+
if (executionCanceler) {
executionCanceler(MakeStoppedError());
}
@@ -207,9 +217,11 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::RunCallback()
}
TPromise<void> idlePromise;
+ TPromise<void> firstExecutedEventPromise;
{
auto guard = Guard(SpinLock_);
idlePromise = IdlePromise_;
+ firstExecutedEventPromise = FirstExecutedEventPromise_;
ExecutingCallback_ = false;
ExecutionCanceler_.Reset();
}
@@ -222,6 +234,8 @@ void TPeriodicExecutorBase<TInvocationTimePolicy>::RunCallback()
executedPromise.TrySet();
}
+ firstExecutedEventPromise.TrySet();
+
auto guard = Guard(SpinLock_);
YT_VERIFY(Busy_);
diff --git a/yt/yt/core/concurrency/periodic_executor_base.h b/yt/yt/core/concurrency/periodic_executor_base.h
index ddf6a853e8..32ce9522e8 100644
--- a/yt/yt/core/concurrency/periodic_executor_base.h
+++ b/yt/yt/core/concurrency/periodic_executor_base.h
@@ -88,6 +88,12 @@ public:
//! Starts the instance.
void Start();
+ //! Starts the instance. Returns a future that becomes set when the first callback
+ //! invocation since the last stop finishes.
+ //! If the call arrives to an already started executor, the future stays the same
+ //! and still corresponds to the first invocation.
+ TFuture<void> StartAndGetFirstExecutedEvent();
+
bool IsStarted() const;
//! Stops the instance, cancels all subsequent invocations.
@@ -134,6 +140,7 @@ private:
TDelayedExecutorCookie Cookie_;
TPromise<void> IdlePromise_;
TPromise<void> ExecutedPromise_;
+ TPromise<void> FirstExecutedEventPromise_;
void DoStop(TGuard<NThreading::TSpinLock>& guard);
diff --git a/yt/yt/core/concurrency/unittests/periodic_ut.cpp b/yt/yt/core/concurrency/unittests/periodic_ut.cpp
index fb452bfc6e..ce60ff52e0 100644
--- a/yt/yt/core/concurrency/unittests/periodic_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/periodic_ut.cpp
@@ -7,6 +7,7 @@
#include <yt/yt/core/concurrency/delayed_executor.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <yt/yt/core/concurrency/scheduler.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
#include <yt/yt/core/misc/lazy_ptr.h>
@@ -18,6 +19,11 @@
namespace NYT::NConcurrency {
namespace {
+using ::testing::Each;
+using ::testing::IsFalse;
+using ::testing::IsTrue;
+using ::testing::Property;
+
////////////////////////////////////////////////////////////////////////////////
class TPeriodicTest
@@ -41,8 +47,9 @@ TEST_W(TPeriodicTest, Simple)
callback,
TDuration::MilliSeconds(100));
- executor->Start();
+ auto firstExecution = executor->StartAndGetFirstExecutedEvent();
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(600));
+ EXPECT_TRUE(firstExecution.IsSet());
WaitFor(executor->Stop())
.ThrowOnError();
EXPECT_EQ(2, count.load());
@@ -60,6 +67,13 @@ TEST_W(TPeriodicTest, Simple)
EXPECT_EQ(6, count.load());
WaitFor(executor->Stop())
.ThrowOnError();
+ EXPECT_EQ(6, count.load());
+
+ WaitFor(executor->StartAndGetFirstExecutedEvent())
+ .ThrowOnError();
+ EXPECT_EQ(7, count.load());
+ WaitFor(executor->Stop())
+ .ThrowOnError();
}
TEST_W(TPeriodicTest, SimpleScheduleOutOfBand)
@@ -90,6 +104,66 @@ TEST_W(TPeriodicTest, SimpleScheduleOutOfBand)
EXPECT_EQ(2, count.load());
}
+TEST_W(TPeriodicTest, ParallelStart)
+{
+ static constexpr int ThreadCount = 4;
+
+ TPromise<void> threadStartBarrier = NewPromise<void>();
+ TPromise<void> callbackStartBarrier = NewPromise<void>();
+ TPromise<void> callbackEndBarrier = NewPromise<void>();
+ std::atomic<int> countStarted = {0};
+ std::atomic<int> countFinished = {0};
+ std::atomic<int> countWaiting = {0};
+
+ auto callback = BIND([&] {
+ ++countStarted;
+ threadStartBarrier.ToFuture().Get();
+ callbackStartBarrier.Set();
+ callbackEndBarrier.ToFuture().Get();
+ ++countFinished;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TPeriodicExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ TDuration::MilliSeconds(200));
+
+ auto startCallback = BIND([&] {
+ auto result = executor->StartAndGetFirstExecutedEvent();
+ if (++countWaiting == ThreadCount) {
+ threadStartBarrier.Set();
+ }
+ return result;
+ });
+
+ auto threadPool = CreateThreadPool(ThreadCount, "test");
+
+ std::vector<TFuture<void>> futures;
+ for (int i = 0; i < ThreadCount; ++i) {
+ futures.push_back(startCallback.AsyncVia(threadPool->GetInvoker()).Run());
+ }
+
+ // Check that start futures are set correctly in all threads
+ // after the first execution, but before the second one.
+
+ callbackStartBarrier.ToFuture().Get();
+ EXPECT_THAT(
+ futures,
+ Each(
+ Property(&TFuture<void>::IsSet, IsFalse())));
+ callbackEndBarrier.Set();
+ threadStartBarrier = NewPromise<void>();
+ WaitFor(AllSucceeded(futures))
+ .ThrowOnError();
+ EXPECT_EQ(1, countStarted.load());
+ EXPECT_EQ(1, countFinished.load());
+ threadStartBarrier.Set();
+
+ WaitFor(executor->Stop())
+ .ThrowOnError();
+}
+
TEST_W(TPeriodicTest, ParallelStop)
{
std::atomic<int> count = {0};
@@ -233,6 +307,34 @@ TEST_W(TPeriodicTest, OnExecutedEventCanceled)
EXPECT_EQ(2, count.load());
}
+TEST_W(TPeriodicTest, OnStartCancelled)
+{
+ auto callbackStarted = NewPromise<void>();
+
+ auto callback = BIND([&] {
+ callbackStarted.Set();
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TPeriodicExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ TDuration::MilliSeconds(200));
+
+ auto startFuture1 = executor->StartAndGetFirstExecutedEvent();
+ auto startFuture2 = executor->StartAndGetFirstExecutedEvent();
+ startFuture1.Cancel(TError(NYT::EErrorCode::Canceled, "Canceled"));
+
+ // NB(pavook): cancellation of a start future shouldn't cause an executor stop
+ // and should not propagate to the underlying promise (and other futures).
+ EXPECT_TRUE(callbackStarted.ToFuture().Get().IsOK());
+ EXPECT_TRUE(executor->IsStarted());
+ EXPECT_TRUE(startFuture2.IsSet());
+ EXPECT_TRUE(startFuture2.Get().IsOK());
+ WaitFor(executor->Stop())
+ .ThrowOnError();
+}
+
TEST_W(TPeriodicTest, Stop)
{
auto neverSetPromise = NewPromise<void>();
@@ -248,7 +350,7 @@ TEST_W(TPeriodicTest, Stop)
callback,
TDuration::MilliSeconds(100));
- executor->Start();
+ auto startFuture = executor->StartAndGetFirstExecutedEvent();
// Wait for the callback to enter WaitFor.
Sleep(TDuration::MilliSeconds(100));
WaitFor(executor->Stop())
@@ -256,6 +358,16 @@ TEST_W(TPeriodicTest, Stop)
EXPECT_TRUE(immediatelyCancelableFuture.IsSet());
EXPECT_EQ(NYT::EErrorCode::Canceled, immediatelyCancelableFuture.Get().GetCode());
+ EXPECT_FALSE(startFuture.Get().IsOK());
+ EXPECT_EQ(NYT::EErrorCode::Canceled, startFuture.Get().GetCode());
+
+ startFuture = executor->StartAndGetFirstExecutedEvent();
+ Sleep(TDuration::MilliSeconds(200));
+ WaitFor(executor->Stop())
+ .ThrowOnError();
+ // startFuture should be set after the first execution.
+ EXPECT_TRUE(startFuture.IsSet());
+ EXPECT_TRUE(startFuture.Get().IsOK());
}
////////////////////////////////////////////////////////////////////////////////