aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorachulkov2 <achulkov2@yandex-team.com>2023-09-29 00:06:49 +0300
committerachulkov2 <achulkov2@yandex-team.com>2023-09-29 00:32:49 +0300
commitf5f10c72b47a362fcf641902cca19acd406dd6ab (patch)
tree34a0f0794b6c3fac71b93737a93eb13810dc2b97
parent95fac3fb8d054fb4f0b538d358e09bbadcffbc91 (diff)
downloadydb-f5f10c72b47a362fcf641902cca19acd406dd6ab.tar.gz
YT-19517: Separate common base from TPeriodicExecutor and a new TScheduledExecutor
-rw-r--r--yt/yt/core/CMakeLists.darwin-x86_64.txt2
-rw-r--r--yt/yt/core/CMakeLists.linux-aarch64.txt2
-rw-r--r--yt/yt/core/CMakeLists.linux-x86_64.txt2
-rw-r--r--yt/yt/core/CMakeLists.windows-x86_64.txt2
-rw-r--r--yt/yt/core/concurrency/periodic_executor.cpp250
-rw-r--r--yt/yt/core/concurrency/periodic_executor.h55
-rw-r--r--yt/yt/core/concurrency/public.h1
-rw-r--r--yt/yt/core/concurrency/recurring_executor_base.cpp269
-rw-r--r--yt/yt/core/concurrency/recurring_executor_base.h90
-rw-r--r--yt/yt/core/concurrency/scheduled_executor.cpp63
-rw-r--r--yt/yt/core/concurrency/scheduled_executor.h52
-rw-r--r--yt/yt/core/concurrency/unittests/periodic_ut.cpp35
-rw-r--r--yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp263
-rw-r--r--yt/yt/core/concurrency/unittests/ya.make1
-rw-r--r--yt/yt/core/ya.make2
15 files changed, 811 insertions, 278 deletions
diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt
index 9ee10c5a62e..0b4da77950b 100644
--- a/yt/yt/core/CMakeLists.darwin-x86_64.txt
+++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt
@@ -127,6 +127,8 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp
diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt
index ca7b9ac8eb5..51eeb4ff568 100644
--- a/yt/yt/core/CMakeLists.linux-aarch64.txt
+++ b/yt/yt/core/CMakeLists.linux-aarch64.txt
@@ -127,6 +127,8 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp
diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt
index d47c1788f9e..164e626f9b2 100644
--- a/yt/yt/core/CMakeLists.linux-x86_64.txt
+++ b/yt/yt/core/CMakeLists.linux-x86_64.txt
@@ -128,6 +128,8 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp
diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt
index 0094aec31c7..76ae848b35b 100644
--- a/yt/yt/core/CMakeLists.windows-x86_64.txt
+++ b/yt/yt/core/CMakeLists.windows-x86_64.txt
@@ -126,6 +126,8 @@ target_sources(yt-yt-core PRIVATE
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp
+ ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp
${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp
diff --git a/yt/yt/core/concurrency/periodic_executor.cpp b/yt/yt/core/concurrency/periodic_executor.cpp
index f4a47617b50..cd40a50f638 100644
--- a/yt/yt/core/concurrency/periodic_executor.cpp
+++ b/yt/yt/core/concurrency/periodic_executor.cpp
@@ -1,10 +1,4 @@
#include "periodic_executor.h"
-#include "scheduler.h"
-
-#include <yt/yt/core/actions/bind.h>
-#include <yt/yt/core/actions/invoker_util.h>
-
-#include <yt/yt/core/concurrency/thread_affinity.h>
#include <yt/yt/core/utilex/random.h>
@@ -36,259 +30,57 @@ TPeriodicExecutor::TPeriodicExecutor(
IInvokerPtr invoker,
TClosure callback,
TPeriodicExecutorOptions options)
- : Invoker_(std::move(invoker))
- , Callback_(std::move(callback))
+ : TRecurringExecutorBase(std::move(invoker), std::move(callback))
, Period_(options.Period)
, Splay_(options.Splay)
, Jitter_(options.Jitter)
-{
- YT_VERIFY(Invoker_);
- YT_VERIFY(Callback_);
-}
-
-void TPeriodicExecutor::Start()
-{
- auto guard = Guard(SpinLock_);
-
- if (Started_) {
- return;
- }
-
- ExecutedPromise_ = TPromise<void>();
- IdlePromise_ = TPromise<void>();
- Started_ = true;
- if (Period_) {
- PostDelayedCallback(RandomDuration(Splay_));
- }
-}
-
-void TPeriodicExecutor::DoStop(TGuard<NThreading::TSpinLock>& guard)
-{
- if (!Started_) {
- return;
- }
-
- Started_ = false;
- OutOfBandRequested_ = false;
- auto executedPromise = ExecutedPromise_;
- auto executionCanceler = ExecutionCanceler_;
- TDelayedExecutor::CancelAndClear(Cookie_);
-
- guard.Release();
-
- if (executedPromise) {
- executedPromise.TrySet(MakeStoppedError());
- }
-
- if (executionCanceler) {
- executionCanceler(MakeStoppedError());
- }
-}
+{ }
-TFuture<void> TPeriodicExecutor::Stop()
+void TPeriodicExecutor::SetPeriod(std::optional<TDuration> period)
{
auto guard = Guard(SpinLock_);
- if (ExecutingCallback_) {
- InitIdlePromise();
- auto idlePromise = IdlePromise_;
- DoStop(guard);
- return idlePromise;
- } else {
- DoStop(guard);
- return VoidFuture;
- }
-}
-
-TError TPeriodicExecutor::MakeStoppedError()
-{
- return TError(NYT::EErrorCode::Canceled, "Periodic executor is stopped");
-}
-
-void TPeriodicExecutor::InitIdlePromise()
-{
- if (IdlePromise_) {
- return;
- }
-
- if (Started_) {
- IdlePromise_ = NewPromise<void>();
- } else {
- IdlePromise_ = MakePromise<void>(TError());
- }
-}
-
-void TPeriodicExecutor::InitExecutedPromise()
-{
- if (ExecutedPromise_) {
- return;
- }
+ auto oldPeriod = Period_;
+ Period_ = period;
- if (Started_) {
- ExecutedPromise_ = NewPromise<void>();
- } else {
- ExecutedPromise_ = MakePromise<void>(MakeStoppedError());
+ if (period && (!oldPeriod || *period < *oldPeriod)) {
+ KickStartInvocationIfNeeded();
}
}
-void TPeriodicExecutor::ScheduleOutOfBand()
+TDuration TPeriodicExecutor::NextDelay()
{
- auto guard = Guard(SpinLock_);
- if (!Started_)
- return;
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
- if (Busy_) {
- OutOfBandRequested_ = true;
+ if (Jitter_ == 0.0) {
+ return *Period_;
} else {
- guard.Release();
- PostCallback();
+ auto period = *Period_;
+ period += RandomDuration(period) * Jitter_ - period * Jitter_ / 2.;
+ return period;
}
}
-void TPeriodicExecutor::PostDelayedCallback(TDuration delay)
+void TPeriodicExecutor::ScheduleFirstCallback()
{
VERIFY_SPINLOCK_AFFINITY(SpinLock_);
- TDelayedExecutor::CancelAndClear(Cookie_);
- Cookie_ = TDelayedExecutor::Submit(
- BIND_NO_PROPAGATE(&TPeriodicExecutor::OnTimer, MakeWeak(this)),
- delay,
- GetSyncInvoker());
-}
-
-void TPeriodicExecutor::PostCallback()
-{
- auto this_ = MakeWeak(this);
- GuardedInvoke(
- Invoker_,
- BIND_NO_PROPAGATE(&TPeriodicExecutor::OnCallbackSuccess, this_),
- BIND_NO_PROPAGATE(&TPeriodicExecutor::OnCallbackInvocationFailed, this_));
-}
-
-void TPeriodicExecutor::OnTimer(bool aborted)
-{
- if (aborted) {
- return;
- }
- PostCallback();
-}
-
-void TPeriodicExecutor::OnCallbackSuccess()
-{
- TPromise<void> executedPromise;
- {
- auto guard = Guard(SpinLock_);
- if (!Started_ || Busy_) {
- return;
- }
- Busy_ = true;
- ExecutingCallback_ = true;
- ExecutionCanceler_ = GetCurrentFiberCanceler();
- TDelayedExecutor::CancelAndClear(Cookie_);
- if (ExecutedPromise_) {
- executedPromise = ExecutedPromise_;
- ExecutedPromise_ = TPromise<void>();
- }
- if (IdlePromise_) {
- IdlePromise_ = NewPromise<void>();
- }
- }
-
- auto cleanup = [=, this] (bool aborted) {
- if (aborted) {
- return;
- }
- TPromise<void> idlePromise;
- {
- auto guard = Guard(SpinLock_);
- idlePromise = IdlePromise_;
- ExecutingCallback_ = false;
- ExecutionCanceler_.Reset();
- }
-
- if (idlePromise) {
- idlePromise.TrySet();
- }
-
- if (executedPromise) {
- executedPromise.TrySet();
- }
-
- auto guard = Guard(SpinLock_);
-
- YT_VERIFY(Busy_);
- Busy_ = false;
-
- if (!Started_) {
- return;
- }
-
- if (OutOfBandRequested_) {
- OutOfBandRequested_ = false;
- guard.Release();
- PostCallback();
- } else if (Period_) {
- PostDelayedCallback(NextDelay());
- }
- };
-
- try {
- Callback_();
- } catch (const TFiberCanceledException&) {
- // There's very little we can do here safely;
- // in particular, we should refrain from setting promises;
- // let's forward the call to the delayed executor.
- TDelayedExecutor::Submit(
- BIND([this_ = MakeStrong(this), cleanup = std::move(cleanup)] (bool aborted) {
- cleanup(aborted);
- }),
- TDuration::Zero());
- throw;
+ if (Period_) {
+ PostDelayedCallback(RandomDuration(Splay_));
}
-
- cleanup(false);
}
-void TPeriodicExecutor::OnCallbackInvocationFailed()
+void TPeriodicExecutor::ScheduleCallback()
{
- auto guard = Guard(SpinLock_);
-
- if (!Started_) {
- return;
- }
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
if (Period_) {
PostDelayedCallback(NextDelay());
}
}
-void TPeriodicExecutor::SetPeriod(std::optional<TDuration> period)
-{
- auto guard = Guard(SpinLock_);
-
- // Kick-start invocations, if needed.
- if (Started_ && period && (!Period_ || *period < *Period_) && !Busy_) {
- PostDelayedCallback(RandomDuration(Splay_));
- }
-
- Period_ = period;
-}
-
-TFuture<void> TPeriodicExecutor::GetExecutedEvent()
-{
- auto guard = Guard(SpinLock_);
- InitExecutedPromise();
- return ExecutedPromise_.ToFuture().ToUncancelable();
-}
-
-TDuration TPeriodicExecutor::NextDelay()
+TError TPeriodicExecutor::MakeStoppedError()
{
- if (Jitter_ == 0.0) {
- return *Period_;
- } else {
- auto period = *Period_;
- period += RandomDuration(period) * Jitter_ - period * Jitter_ / 2.;
- return period;
- }
+ return TError(NYT::EErrorCode::Canceled, "Periodic executor is stopped");
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/periodic_executor.h b/yt/yt/core/concurrency/periodic_executor.h
index bc4b9ca0e2d..420b5afde82 100644
--- a/yt/yt/core/concurrency/periodic_executor.h
+++ b/yt/yt/core/concurrency/periodic_executor.h
@@ -1,10 +1,7 @@
#pragma once
#include "public.h"
-#include "delayed_executor.h"
-
-#include <yt/yt/core/actions/callback.h>
-#include <yt/yt/core/actions/future.h>
+#include "recurring_executor_base.h"
namespace NYT::NConcurrency {
@@ -25,7 +22,7 @@ struct TPeriodicExecutorOptions
//! Helps to perform certain actions periodically.
class TPeriodicExecutor
- : public TRefCounted
+ : public TRecurringExecutorBase
{
public:
//! Initializes an instance.
@@ -47,58 +44,20 @@ public:
TClosure callback,
std::optional<TDuration> period = {});
- //! Starts the instance.
- //! The first invocation happens with a random delay within splay time.
- void Start();
-
- //! Stops the instance, cancels all subsequent invocations.
- //! Returns a future that becomes set when all outstanding callback
- //! invocations are finished and no more invocations are expected to happen.
- TFuture<void> Stop();
-
- //! Requests an immediate invocation.
- void ScheduleOutOfBand();
-
//! Changes execution period.
void SetPeriod(std::optional<TDuration> period);
- //! Returns the future that become set when
- //! at least one action be fully executed from the moment of method call.
- //! Cancellation of the returned future will not affect the action
- //! or other futures returned by this method.
- TFuture<void> GetExecutedEvent();
+protected:
+ void ScheduleFirstCallback() override;
+ void ScheduleCallback() override;
+
+ TError MakeStoppedError() override;
private:
- const IInvokerPtr Invoker_;
- const TClosure Callback_;
std::optional<TDuration> Period_;
const TDuration Splay_;
const double Jitter_;
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- bool Started_ = false;
- bool Busy_ = false;
- bool OutOfBandRequested_ = false;
- bool ExecutingCallback_ = false;
- TCallback<void(const TError&)> ExecutionCanceler_;
- TDelayedExecutorCookie Cookie_;
- TPromise<void> IdlePromise_;
- TPromise<void> ExecutedPromise_;
-
- void DoStop(TGuard<NThreading::TSpinLock>& guard);
-
- static TError MakeStoppedError();
-
- void InitIdlePromise();
- void InitExecutedPromise();
-
- void PostDelayedCallback(TDuration delay);
- void PostCallback();
-
- void OnTimer(bool aborted);
- void OnCallbackSuccess();
- void OnCallbackInvocationFailed();
-
TDuration NextDelay();
};
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h
index fe3ac26f34e..e066a287f0c 100644
--- a/yt/yt/core/concurrency/public.h
+++ b/yt/yt/core/concurrency/public.h
@@ -17,6 +17,7 @@ DECLARE_REFCOUNTED_STRUCT(IThreadPool)
DECLARE_REFCOUNTED_STRUCT(ISuspendableActionQueue)
DECLARE_REFCOUNTED_CLASS(TPeriodicExecutor)
+DECLARE_REFCOUNTED_CLASS(TScheduledExecutor)
DECLARE_REFCOUNTED_CLASS(TInvokerAlarm)
DECLARE_REFCOUNTED_CLASS(TAsyncSemaphore)
diff --git a/yt/yt/core/concurrency/recurring_executor_base.cpp b/yt/yt/core/concurrency/recurring_executor_base.cpp
new file mode 100644
index 00000000000..912355c0ec4
--- /dev/null
+++ b/yt/yt/core/concurrency/recurring_executor_base.cpp
@@ -0,0 +1,269 @@
+#include "recurring_executor_base.h"
+#include "scheduler.h"
+
+#include <yt/yt/core/actions/bind.h>
+
+#include <yt/yt/core/concurrency/thread_affinity.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRecurringExecutorBase::TRecurringExecutorBase(
+ IInvokerPtr invoker,
+ TClosure callback)
+ : Invoker_(std::move(invoker))
+ , Callback_(std::move(callback))
+{
+ YT_VERIFY(Invoker_);
+ YT_VERIFY(Callback_);
+}
+
+void TRecurringExecutorBase::Start()
+{
+ auto guard = Guard(SpinLock_);
+
+ if (Started_) {
+ return;
+ }
+
+ ExecutedPromise_ = TPromise<void>();
+ IdlePromise_ = TPromise<void>();
+ Started_ = true;
+ ScheduleFirstCallback();
+}
+
+void TRecurringExecutorBase::DoStop(TGuard<NThreading::TSpinLock>& guard)
+{
+ if (!Started_) {
+ return;
+ }
+
+ Started_ = false;
+ OutOfBandRequested_ = false;
+ auto executedPromise = ExecutedPromise_;
+ auto executionCanceler = ExecutionCanceler_;
+ TDelayedExecutor::CancelAndClear(Cookie_);
+
+ guard.Release();
+
+ if (executedPromise) {
+ executedPromise.TrySet(MakeStoppedError());
+ }
+
+ if (executionCanceler) {
+ executionCanceler(MakeStoppedError());
+ }
+}
+
+TFuture<void> TRecurringExecutorBase::Stop()
+{
+ auto guard = Guard(SpinLock_);
+ if (IsExecutingCallback()) {
+ InitIdlePromise();
+ auto idlePromise = IdlePromise_;
+ DoStop(guard);
+ return idlePromise;
+ } else {
+ DoStop(guard);
+ return VoidFuture;
+ }
+}
+
+TFuture<void> TRecurringExecutorBase::GetExecutedEvent()
+{
+ auto guard = Guard(SpinLock_);
+ InitExecutedPromise();
+ return ExecutedPromise_.ToFuture().ToUncancelable();
+}
+
+void TRecurringExecutorBase::InitIdlePromise()
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ if (IdlePromise_) {
+ return;
+ }
+
+ if (Started_) {
+ IdlePromise_ = NewPromise<void>();
+ } else {
+ IdlePromise_ = MakePromise<void>(TError());
+ }
+}
+
+void TRecurringExecutorBase::InitExecutedPromise()
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ if (ExecutedPromise_) {
+ return;
+ }
+
+ if (Started_) {
+ ExecutedPromise_ = NewPromise<void>();
+ } else {
+ ExecutedPromise_ = MakePromise<void>(MakeStoppedError());
+ }
+}
+
+void TRecurringExecutorBase::ScheduleOutOfBand()
+{
+ auto guard = Guard(SpinLock_);
+ if (!Started_) {
+ return;
+ }
+
+ if (Busy_) {
+ OutOfBandRequested_ = true;
+ } else {
+ guard.Release();
+ PostCallback();
+ }
+}
+
+void TRecurringExecutorBase::PostCallbackWithDeadline(TInstant deadline)
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ TDelayedExecutor::CancelAndClear(Cookie_);
+ Cookie_ = TDelayedExecutor::Submit(
+ BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnTimer, MakeWeak(this)),
+ deadline,
+ GetSyncInvoker());
+}
+
+void TRecurringExecutorBase::PostDelayedCallback(TDuration delay)
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ TDelayedExecutor::CancelAndClear(Cookie_);
+ Cookie_ = TDelayedExecutor::Submit(
+ BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnTimer, MakeWeak(this)),
+ delay,
+ GetSyncInvoker());
+}
+
+void TRecurringExecutorBase::PostCallback()
+{
+ auto this_ = MakeWeak(this);
+ GuardedInvoke(
+ Invoker_,
+ BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnCallbackSuccess, this_),
+ BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnCallbackInvocationFailed, this_));
+}
+
+void TRecurringExecutorBase::OnTimer(bool aborted)
+{
+ if (aborted) {
+ return;
+ }
+ PostCallback();
+}
+
+void TRecurringExecutorBase::OnCallbackSuccess()
+{
+ TPromise<void> executedPromise;
+ {
+ auto guard = Guard(SpinLock_);
+ if (!Started_ || Busy_) {
+ return;
+ }
+ Busy_ = true;
+ ExecutingCallback_ = true;
+ ExecutionCanceler_ = GetCurrentFiberCanceler();
+ TDelayedExecutor::CancelAndClear(Cookie_);
+ if (ExecutedPromise_) {
+ executedPromise = ExecutedPromise_;
+ ExecutedPromise_ = TPromise<void>();
+ }
+ if (IdlePromise_) {
+ IdlePromise_ = NewPromise<void>();
+ }
+ }
+
+ auto cleanup = [=, this] (bool aborted) {
+ if (aborted) {
+ return;
+ }
+
+ TPromise<void> idlePromise;
+ {
+ auto guard = Guard(SpinLock_);
+ idlePromise = IdlePromise_;
+ ExecutingCallback_ = false;
+ ExecutionCanceler_.Reset();
+ }
+
+ if (idlePromise) {
+ idlePromise.TrySet();
+ }
+
+ if (executedPromise) {
+ executedPromise.TrySet();
+ }
+
+ auto guard = Guard(SpinLock_);
+
+ YT_VERIFY(Busy_);
+ Busy_ = false;
+
+ if (!Started_) {
+ return;
+ }
+ if (OutOfBandRequested_) {
+ OutOfBandRequested_ = false;
+ guard.Release();
+ PostCallback();
+ } else {
+ ScheduleCallback();
+ }
+ };
+
+ try {
+ Callback_();
+ } catch (const TFiberCanceledException&) {
+ // There's very little we can do here safely;
+ // in particular, we should refrain from setting promises;
+ // let's forward the call to the delayed executor.
+ TDelayedExecutor::Submit(
+ BIND([this_ = MakeStrong(this), cleanup = std::move(cleanup)] (bool aborted) {
+ cleanup(aborted);
+ }),
+ TDuration::Zero());
+ throw;
+ }
+
+ cleanup(false);
+}
+
+void TRecurringExecutorBase::OnCallbackInvocationFailed()
+{
+ auto guard = Guard(SpinLock_);
+
+ if (!Started_) {
+ return;
+ }
+
+ ScheduleCallback();
+}
+
+void TRecurringExecutorBase::KickStartInvocationIfNeeded()
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ if (Started_ && !Busy_) {
+ ScheduleFirstCallback();
+ }
+}
+
+bool TRecurringExecutorBase::IsExecutingCallback() const
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ return ExecutingCallback_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/recurring_executor_base.h b/yt/yt/core/concurrency/recurring_executor_base.h
new file mode 100644
index 00000000000..76d5ed24e2b
--- /dev/null
+++ b/yt/yt/core/concurrency/recurring_executor_base.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include "public.h"
+#include "delayed_executor.h"
+
+#include <yt/yt/core/actions/callback.h>
+#include <yt/yt/core/actions/future.h>
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Abstract base class providing possibility to perform actions with certain schedule.
+class TRecurringExecutorBase
+ : public TRefCounted
+{
+public:
+ //! Initializes an instance.
+ /*!
+ * \note
+ * We must call #Start to activate the instance.
+ *
+ * \param invoker Invoker used for wrapping actions.
+ * \param callback Callback to invoke according to the schedule.
+ */
+ TRecurringExecutorBase(
+ IInvokerPtr invoker,
+ TClosure callback);
+
+ //! Starts the instance.
+ //! The first invocation happens with a delay defined by the class heir.
+ void Start();
+
+ //! Stops the instance, cancels all subsequent invocations.
+ //! Returns a future that becomes set when all outstanding callback
+ //! invocations are finished and no more invocations are expected to happen.
+ TFuture<void> Stop();
+
+ //! Requests an immediate invocation.
+ void ScheduleOutOfBand();
+
+ //! Returns the future that become set when
+ //! at least one action be fully executed from the moment of method call.
+ //! Cancellation of the returned future will not affect the action
+ //! or other futures returned by this method.
+ TFuture<void> GetExecutedEvent();
+
+private:
+ const IInvokerPtr Invoker_;
+ const TClosure Callback_;
+
+ bool Started_ = false;
+ bool Busy_ = false;
+ bool OutOfBandRequested_ = false;
+ bool ExecutingCallback_ = false;
+ TCallback<void(const TError&)> ExecutionCanceler_;
+ TDelayedExecutorCookie Cookie_;
+ TPromise<void> IdlePromise_;
+ TPromise<void> ExecutedPromise_;
+
+ void InitIdlePromise();
+ void InitExecutedPromise();
+
+ void OnTimer(bool aborted);
+ void OnCallbackSuccess();
+ void OnCallbackInvocationFailed();
+
+ void DoStop(TGuard<NThreading::TSpinLock>& guard);
+
+protected:
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+
+ void PostCallbackWithDeadline(TInstant deadline);
+ void PostDelayedCallback(TDuration delay);
+ void PostCallback();
+
+ void KickStartInvocationIfNeeded();
+
+ bool IsExecutingCallback() const;
+
+ virtual void ScheduleFirstCallback() = 0;
+ virtual void ScheduleCallback() = 0;
+
+ virtual TError MakeStoppedError() = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
+
diff --git a/yt/yt/core/concurrency/scheduled_executor.cpp b/yt/yt/core/concurrency/scheduled_executor.cpp
new file mode 100644
index 00000000000..5eef530e4dd
--- /dev/null
+++ b/yt/yt/core/concurrency/scheduled_executor.cpp
@@ -0,0 +1,63 @@
+#include "scheduled_executor.h"
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TScheduledExecutor::TScheduledExecutor(
+ IInvokerPtr invoker,
+ TClosure callback,
+ std::optional<TDuration> interval)
+ : TRecurringExecutorBase(std::move(invoker), std::move(callback))
+ , Interval_(interval)
+{
+ YT_VERIFY(!Interval_ || Interval_ != TDuration::Zero());
+}
+
+void TScheduledExecutor::SetInterval(std::optional<TDuration> interval)
+{
+ YT_VERIFY(!interval || interval != TDuration::Zero());
+
+ auto guard = Guard(SpinLock_);
+
+ auto oldInterval = Interval_;
+ Interval_ = interval;
+
+ if (interval && !(oldInterval && oldInterval->GetValue() % oldInterval->GetValue() == 0)) {
+ KickStartInvocationIfNeeded();
+ }
+}
+
+void TScheduledExecutor::ScheduleFirstCallback()
+{
+ ScheduleCallback();
+}
+
+void TScheduledExecutor::ScheduleCallback()
+{
+ if (Interval_) {
+ PostCallbackWithDeadline(NextDeadline());
+ }
+}
+
+TError TScheduledExecutor::MakeStoppedError()
+{
+ return TError(NYT::EErrorCode::Canceled, "Interval executor is stopped");
+}
+
+TInstant TScheduledExecutor::NextDeadline()
+{
+ VERIFY_SPINLOCK_AFFINITY(SpinLock_);
+
+ YT_VERIFY(Interval_);
+
+ // TInstant and TDuration are guaranteed to have same precision.
+ const auto& intervalValue = Interval_->GetValue();
+ const auto& nowValue = TInstant::Now().GetValue();
+
+ return TInstant::FromValue(nowValue + (intervalValue - nowValue % intervalValue));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/scheduled_executor.h b/yt/yt/core/concurrency/scheduled_executor.h
new file mode 100644
index 00000000000..bc35208d53c
--- /dev/null
+++ b/yt/yt/core/concurrency/scheduled_executor.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include "public.h"
+#include "recurring_executor_base.h"
+
+namespace NYT::NConcurrency {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Invokes callbacks according to a primitive schedule.
+//! Given a set non-zero interval, callbacks will be executed at times, which are a multiple of this interval.
+//! E.g. if the interval is 5 minutes, callbacks will be executed at 00:00, 00:05, 00:10, etc.
+class TScheduledExecutor
+ : public TRecurringExecutorBase
+{
+public:
+ //! Initializes an instance.
+ /*!
+ * \note
+ * We must call #Start to activate the instance.
+ *
+ * \param invoker Invoker used for wrapping actions.
+ * \param callback Callback to invoke periodically.
+ * \param interval Determines the moments of time at which the callback will be executed.
+ */
+ TScheduledExecutor(
+ IInvokerPtr invoker,
+ TClosure callback,
+ std::optional<TDuration> interval);
+
+ void SetInterval(std::optional<TDuration> interval);
+
+protected:
+ void ScheduleFirstCallback() override;
+ void ScheduleCallback() override;
+
+ TError MakeStoppedError() override;
+
+private:
+ std::optional<TDuration> Interval_;
+
+ //! Returns the next time instant which is a multiple of the configured interval.
+ //! NB: If the current instant is itself a multiple of the configured interval, this method will return the next
+ //! suitable instant.
+ TInstant NextDeadline();
+};
+
+DEFINE_REFCOUNTED_TYPE(TScheduledExecutor)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/periodic_ut.cpp b/yt/yt/core/concurrency/unittests/periodic_ut.cpp
index c5309d1175e..d033ed3f934 100644
--- a/yt/yt/core/concurrency/unittests/periodic_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/periodic_ut.cpp
@@ -29,6 +29,7 @@ class TPeriodicTest
TEST_W(TPeriodicTest, Simple)
{
std::atomic<int> count = {0};
+
auto callback = BIND([&] () {
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(200));
++count;
@@ -61,11 +62,38 @@ TEST_W(TPeriodicTest, Simple)
.ThrowOnError();
}
-// TODO(achulkov2): Add simple out of band execution test.
+TEST_W(TPeriodicTest, SimpleScheduleOutOfBand)
+{
+ std::atomic<int> count = {0};
+
+ auto callback = BIND([&] () {
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TPeriodicExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ TDuration::MilliSeconds(300));
+
+ executor->Start();
+ // Wait for first invocation.
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(20));
+
+ auto future = executor->GetExecutedEvent();
+ const auto& now = TInstant::Now();
+ executor->ScheduleOutOfBand();
+ WaitFor(future)
+ .ThrowOnError();
+ auto executionDuration = TInstant::Now() - now;
+ EXPECT_LT(executionDuration, TDuration::MilliSeconds(20));
+ EXPECT_EQ(2, count.load());
+}
TEST_W(TPeriodicTest, ParallelStop)
{
std::atomic<int> count = {0};
+
auto callback = BIND([&] () {
++count;
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500));
@@ -108,6 +136,7 @@ TEST_W(TPeriodicTest, ParallelOnExecuted1)
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500));
++count;
});
+
auto actionQueue = New<TActionQueue>();
auto executor = New<TPeriodicExecutor>(
actionQueue->GetInvoker(),
@@ -144,6 +173,7 @@ TEST_W(TPeriodicTest, ParallelOnExecuted2)
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
++count;
});
+
auto actionQueue = New<TActionQueue>();
auto executor = New<TPeriodicExecutor>(
actionQueue->GetInvoker(),
@@ -180,6 +210,7 @@ TEST_W(TPeriodicTest, OnExecutedEventCanceled)
TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
++count;
});
+
auto actionQueue = New<TActionQueue>();
auto executor = New<TPeriodicExecutor>(
actionQueue->GetInvoker(),
@@ -206,9 +237,11 @@ TEST_W(TPeriodicTest, Stop)
{
auto neverSetPromise = NewPromise<void>();
auto immediatelyCancelableFuture = neverSetPromise.ToFuture().ToImmediatelyCancelable();
+
auto callback = BIND([&] {
WaitUntilSet(immediatelyCancelableFuture);
});
+
auto actionQueue = New<TActionQueue>();
auto executor = New<TPeriodicExecutor>(
actionQueue->GetInvoker(),
diff --git a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp
new file mode 100644
index 00000000000..c225667c3a3
--- /dev/null
+++ b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp
@@ -0,0 +1,263 @@
+#include <yt/yt/core/test_framework/framework.h>
+
+#include <yt/yt/core/actions/invoker_util.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/delayed_executor.h>
+#include <yt/yt/core/concurrency/scheduled_executor.h>
+
+#include <atomic>
+
+namespace NYT::NConcurrency {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TScheduledExecutorTest
+ : public ::testing::Test
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+void CheckTimeSlotCorrectness(const TDuration& interval)
+{
+ const auto& now = TInstant::Now();
+ auto lastTick = TInstant::FromValue((now.GetValue() / interval.GetValue()) * interval.GetValue());
+ YT_VERIFY(now - lastTick <= TDuration::MilliSeconds(10));
+}
+
+TEST_W(TScheduledExecutorTest, Simple)
+{
+ const auto& interval = TDuration::MilliSeconds(200);
+ std::atomic<int> count = {0};
+
+ auto callback = BIND([&] () {
+ CheckTimeSlotCorrectness(interval);
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300));
+ WaitFor(executor->Stop())
+ .ThrowOnError();
+ EXPECT_LE(1, count.load());
+ EXPECT_GE(2, count.load());
+}
+
+TEST_W(TScheduledExecutorTest, SimpleScheduleOutOfBand)
+{
+ std::atomic<int> count = {0};
+
+ auto callback = BIND([&] () {
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ TDuration::MilliSeconds(300));
+
+ executor->Start();
+ TDuration executionDuration;
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+ const auto& now = TInstant::Now();
+ executor->ScheduleOutOfBand();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2})))
+ .ThrowOnError();
+ executionDuration = TInstant::Now() - now;
+ }
+ EXPECT_EQ(1, count.load());
+ EXPECT_GT(TDuration::MilliSeconds(20), executionDuration);
+}
+
+TEST_W(TScheduledExecutorTest, ParallelStop)
+{
+ const auto& interval = TDuration::MilliSeconds(10);
+ std::atomic<int> count = {0};
+
+ auto callback = BIND([&] () {
+ CheckTimeSlotCorrectness(interval);
+ ++count;
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500));
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300));
+ {
+ auto future1 = executor->Stop();
+ auto future2 = executor->Stop();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(1, count.load());
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300));
+ {
+ auto future1 = executor->Stop();
+ auto future2 = executor->Stop();
+ auto future3 = executor->Stop();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(2, count.load());
+}
+
+TEST_W(TScheduledExecutorTest, ParallelOnExecuted1)
+{
+ const auto& interval = TDuration::MilliSeconds(10);
+ std::atomic<int> count = 0;
+
+ auto callback = BIND([&] () {
+ CheckTimeSlotCorrectness(interval);
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500));
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300));
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(2, count.load());
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(450));
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+ auto future3 = executor->GetExecutedEvent();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(4, count.load());
+}
+
+TEST_W(TScheduledExecutorTest, ParallelOnExecuted2)
+{
+ const auto& interval = TDuration::MilliSeconds(400);
+ std::atomic<int> count = 0;
+
+ auto callback = BIND([&] () {
+ CheckTimeSlotCorrectness(interval);
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(400));
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(2, count.load());
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+ auto future3 = executor->GetExecutedEvent();
+ WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3})))
+ .ThrowOnError();
+ }
+ EXPECT_EQ(3, count.load());
+}
+
+TEST_W(TScheduledExecutorTest, OnExecutedEventCanceled)
+{
+ const auto& interval = TDuration::MilliSeconds(50);
+ std::atomic<int> count = 0;
+
+ auto callback = BIND([&] () {
+ CheckTimeSlotCorrectness(interval);
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(200));
+ ++count;
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100));
+ {
+ auto future1 = executor->GetExecutedEvent();
+ auto future2 = executor->GetExecutedEvent();
+
+ // Cancellation of the executed event future must not propagate to the underlying event.
+ auto future3 = executor->GetExecutedEvent();
+ future3.Cancel(TError(NYT::EErrorCode::Canceled, "Canceled"));
+
+ EXPECT_NO_THROW(WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2})))
+ .ThrowOnError());
+ }
+ EXPECT_EQ(2, count.load());
+}
+
+TEST_W(TScheduledExecutorTest, Stop)
+{
+ const auto& interval = TDuration::MilliSeconds(20);
+ auto neverSetPromise = NewPromise<void>();
+ auto immediatelyCancelableFuture = neverSetPromise.ToFuture().ToImmediatelyCancelable();
+
+ auto callback = BIND([&] {
+ CheckTimeSlotCorrectness(interval);
+ WaitUntilSet(immediatelyCancelableFuture);
+ });
+
+ auto actionQueue = New<TActionQueue>();
+ auto executor = New<TScheduledExecutor>(
+ actionQueue->GetInvoker(),
+ callback,
+ interval);
+
+ executor->Start();
+ // Wait for the callback to enter WaitFor.
+ Sleep(TDuration::MilliSeconds(100));
+ WaitFor(executor->Stop())
+ .ThrowOnError();
+
+ EXPECT_TRUE(immediatelyCancelableFuture.IsSet());
+ EXPECT_EQ(NYT::EErrorCode::Canceled, immediatelyCancelableFuture.Get().GetCode());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make
index b507752539a..af74851a821 100644
--- a/yt/yt/core/concurrency/unittests/ya.make
+++ b/yt/yt/core/concurrency/unittests/ya.make
@@ -28,6 +28,7 @@ SRCS(
periodic_ut.cpp
propagating_storage_ut.cpp
quantized_executor_ut.cpp
+ scheduled_executor_ut.cpp
scheduler_ut.cpp
suspendable_action_queue_ut.cpp
thread_affinity_ut.cpp
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index db4201c01cf..e985dd82aea 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -81,6 +81,8 @@ SRCS(
concurrency/thread_pool.cpp
concurrency/throughput_throttler.cpp
concurrency/two_level_fair_share_thread_pool.cpp
+ concurrency/recurring_executor_base.cpp
+ concurrency/scheduled_executor.cpp
crypto/config.cpp
crypto/crypto.cpp