diff options
author | achulkov2 <achulkov2@yandex-team.com> | 2023-09-29 00:06:49 +0300 |
---|---|---|
committer | achulkov2 <achulkov2@yandex-team.com> | 2023-09-29 00:32:49 +0300 |
commit | f5f10c72b47a362fcf641902cca19acd406dd6ab (patch) | |
tree | 34a0f0794b6c3fac71b93737a93eb13810dc2b97 | |
parent | 95fac3fb8d054fb4f0b538d358e09bbadcffbc91 (diff) | |
download | ydb-f5f10c72b47a362fcf641902cca19acd406dd6ab.tar.gz |
YT-19517: Separate common base from TPeriodicExecutor and a new TScheduledExecutor
-rw-r--r-- | yt/yt/core/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-aarch64.txt | 2 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | yt/yt/core/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor.cpp | 250 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor.h | 55 | ||||
-rw-r--r-- | yt/yt/core/concurrency/public.h | 1 | ||||
-rw-r--r-- | yt/yt/core/concurrency/recurring_executor_base.cpp | 269 | ||||
-rw-r--r-- | yt/yt/core/concurrency/recurring_executor_base.h | 90 | ||||
-rw-r--r-- | yt/yt/core/concurrency/scheduled_executor.cpp | 63 | ||||
-rw-r--r-- | yt/yt/core/concurrency/scheduled_executor.h | 52 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/periodic_ut.cpp | 35 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp | 263 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 2 |
15 files changed, 811 insertions, 278 deletions
diff --git a/yt/yt/core/CMakeLists.darwin-x86_64.txt b/yt/yt/core/CMakeLists.darwin-x86_64.txt index 9ee10c5a62e..0b4da77950b 100644 --- a/yt/yt/core/CMakeLists.darwin-x86_64.txt +++ b/yt/yt/core/CMakeLists.darwin-x86_64.txt @@ -127,6 +127,8 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp diff --git a/yt/yt/core/CMakeLists.linux-aarch64.txt b/yt/yt/core/CMakeLists.linux-aarch64.txt index ca7b9ac8eb5..51eeb4ff568 100644 --- a/yt/yt/core/CMakeLists.linux-aarch64.txt +++ b/yt/yt/core/CMakeLists.linux-aarch64.txt @@ -127,6 +127,8 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp diff --git a/yt/yt/core/CMakeLists.linux-x86_64.txt b/yt/yt/core/CMakeLists.linux-x86_64.txt index d47c1788f9e..164e626f9b2 100644 --- a/yt/yt/core/CMakeLists.linux-x86_64.txt +++ b/yt/yt/core/CMakeLists.linux-x86_64.txt @@ -128,6 +128,8 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp diff --git a/yt/yt/core/CMakeLists.windows-x86_64.txt b/yt/yt/core/CMakeLists.windows-x86_64.txt index 0094aec31c7..76ae848b35b 100644 --- a/yt/yt/core/CMakeLists.windows-x86_64.txt +++ b/yt/yt/core/CMakeLists.windows-x86_64.txt @@ -126,6 +126,8 @@ target_sources(yt-yt-core PRIVATE ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/thread_pool.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/throughput_throttler.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/recurring_executor_base.cpp + ${CMAKE_SOURCE_DIR}/yt/yt/core/concurrency/scheduled_executor.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/config.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/crypto.cpp ${CMAKE_SOURCE_DIR}/yt/yt/core/crypto/tls.cpp diff --git a/yt/yt/core/concurrency/periodic_executor.cpp b/yt/yt/core/concurrency/periodic_executor.cpp index f4a47617b50..cd40a50f638 100644 --- a/yt/yt/core/concurrency/periodic_executor.cpp +++ b/yt/yt/core/concurrency/periodic_executor.cpp @@ -1,10 +1,4 @@ #include "periodic_executor.h" -#include "scheduler.h" - -#include <yt/yt/core/actions/bind.h> -#include <yt/yt/core/actions/invoker_util.h> - -#include <yt/yt/core/concurrency/thread_affinity.h> #include <yt/yt/core/utilex/random.h> @@ -36,259 +30,57 @@ TPeriodicExecutor::TPeriodicExecutor( IInvokerPtr invoker, TClosure callback, TPeriodicExecutorOptions options) - : Invoker_(std::move(invoker)) - , Callback_(std::move(callback)) + : TRecurringExecutorBase(std::move(invoker), std::move(callback)) , Period_(options.Period) , Splay_(options.Splay) , Jitter_(options.Jitter) -{ - YT_VERIFY(Invoker_); - YT_VERIFY(Callback_); -} - -void TPeriodicExecutor::Start() -{ - auto guard = Guard(SpinLock_); - - if (Started_) { - return; - } - - ExecutedPromise_ = TPromise<void>(); - IdlePromise_ = TPromise<void>(); - Started_ = true; - if (Period_) { - PostDelayedCallback(RandomDuration(Splay_)); - } -} - -void TPeriodicExecutor::DoStop(TGuard<NThreading::TSpinLock>& guard) -{ - if (!Started_) { - return; - } - - Started_ = false; - OutOfBandRequested_ = false; - auto executedPromise = ExecutedPromise_; - auto executionCanceler = ExecutionCanceler_; - TDelayedExecutor::CancelAndClear(Cookie_); - - guard.Release(); - - if (executedPromise) { - executedPromise.TrySet(MakeStoppedError()); - } - - if (executionCanceler) { - executionCanceler(MakeStoppedError()); - } -} +{ } -TFuture<void> TPeriodicExecutor::Stop() +void TPeriodicExecutor::SetPeriod(std::optional<TDuration> period) { auto guard = Guard(SpinLock_); - if (ExecutingCallback_) { - InitIdlePromise(); - auto idlePromise = IdlePromise_; - DoStop(guard); - return idlePromise; - } else { - DoStop(guard); - return VoidFuture; - } -} - -TError TPeriodicExecutor::MakeStoppedError() -{ - return TError(NYT::EErrorCode::Canceled, "Periodic executor is stopped"); -} - -void TPeriodicExecutor::InitIdlePromise() -{ - if (IdlePromise_) { - return; - } - - if (Started_) { - IdlePromise_ = NewPromise<void>(); - } else { - IdlePromise_ = MakePromise<void>(TError()); - } -} - -void TPeriodicExecutor::InitExecutedPromise() -{ - if (ExecutedPromise_) { - return; - } + auto oldPeriod = Period_; + Period_ = period; - if (Started_) { - ExecutedPromise_ = NewPromise<void>(); - } else { - ExecutedPromise_ = MakePromise<void>(MakeStoppedError()); + if (period && (!oldPeriod || *period < *oldPeriod)) { + KickStartInvocationIfNeeded(); } } -void TPeriodicExecutor::ScheduleOutOfBand() +TDuration TPeriodicExecutor::NextDelay() { - auto guard = Guard(SpinLock_); - if (!Started_) - return; + VERIFY_SPINLOCK_AFFINITY(SpinLock_); - if (Busy_) { - OutOfBandRequested_ = true; + if (Jitter_ == 0.0) { + return *Period_; } else { - guard.Release(); - PostCallback(); + auto period = *Period_; + period += RandomDuration(period) * Jitter_ - period * Jitter_ / 2.; + return period; } } -void TPeriodicExecutor::PostDelayedCallback(TDuration delay) +void TPeriodicExecutor::ScheduleFirstCallback() { VERIFY_SPINLOCK_AFFINITY(SpinLock_); - TDelayedExecutor::CancelAndClear(Cookie_); - Cookie_ = TDelayedExecutor::Submit( - BIND_NO_PROPAGATE(&TPeriodicExecutor::OnTimer, MakeWeak(this)), - delay, - GetSyncInvoker()); -} - -void TPeriodicExecutor::PostCallback() -{ - auto this_ = MakeWeak(this); - GuardedInvoke( - Invoker_, - BIND_NO_PROPAGATE(&TPeriodicExecutor::OnCallbackSuccess, this_), - BIND_NO_PROPAGATE(&TPeriodicExecutor::OnCallbackInvocationFailed, this_)); -} - -void TPeriodicExecutor::OnTimer(bool aborted) -{ - if (aborted) { - return; - } - PostCallback(); -} - -void TPeriodicExecutor::OnCallbackSuccess() -{ - TPromise<void> executedPromise; - { - auto guard = Guard(SpinLock_); - if (!Started_ || Busy_) { - return; - } - Busy_ = true; - ExecutingCallback_ = true; - ExecutionCanceler_ = GetCurrentFiberCanceler(); - TDelayedExecutor::CancelAndClear(Cookie_); - if (ExecutedPromise_) { - executedPromise = ExecutedPromise_; - ExecutedPromise_ = TPromise<void>(); - } - if (IdlePromise_) { - IdlePromise_ = NewPromise<void>(); - } - } - - auto cleanup = [=, this] (bool aborted) { - if (aborted) { - return; - } - TPromise<void> idlePromise; - { - auto guard = Guard(SpinLock_); - idlePromise = IdlePromise_; - ExecutingCallback_ = false; - ExecutionCanceler_.Reset(); - } - - if (idlePromise) { - idlePromise.TrySet(); - } - - if (executedPromise) { - executedPromise.TrySet(); - } - - auto guard = Guard(SpinLock_); - - YT_VERIFY(Busy_); - Busy_ = false; - - if (!Started_) { - return; - } - - if (OutOfBandRequested_) { - OutOfBandRequested_ = false; - guard.Release(); - PostCallback(); - } else if (Period_) { - PostDelayedCallback(NextDelay()); - } - }; - - try { - Callback_(); - } catch (const TFiberCanceledException&) { - // There's very little we can do here safely; - // in particular, we should refrain from setting promises; - // let's forward the call to the delayed executor. - TDelayedExecutor::Submit( - BIND([this_ = MakeStrong(this), cleanup = std::move(cleanup)] (bool aborted) { - cleanup(aborted); - }), - TDuration::Zero()); - throw; + if (Period_) { + PostDelayedCallback(RandomDuration(Splay_)); } - - cleanup(false); } -void TPeriodicExecutor::OnCallbackInvocationFailed() +void TPeriodicExecutor::ScheduleCallback() { - auto guard = Guard(SpinLock_); - - if (!Started_) { - return; - } + VERIFY_SPINLOCK_AFFINITY(SpinLock_); if (Period_) { PostDelayedCallback(NextDelay()); } } -void TPeriodicExecutor::SetPeriod(std::optional<TDuration> period) -{ - auto guard = Guard(SpinLock_); - - // Kick-start invocations, if needed. - if (Started_ && period && (!Period_ || *period < *Period_) && !Busy_) { - PostDelayedCallback(RandomDuration(Splay_)); - } - - Period_ = period; -} - -TFuture<void> TPeriodicExecutor::GetExecutedEvent() -{ - auto guard = Guard(SpinLock_); - InitExecutedPromise(); - return ExecutedPromise_.ToFuture().ToUncancelable(); -} - -TDuration TPeriodicExecutor::NextDelay() +TError TPeriodicExecutor::MakeStoppedError() { - if (Jitter_ == 0.0) { - return *Period_; - } else { - auto period = *Period_; - period += RandomDuration(period) * Jitter_ - period * Jitter_ / 2.; - return period; - } + return TError(NYT::EErrorCode::Canceled, "Periodic executor is stopped"); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/periodic_executor.h b/yt/yt/core/concurrency/periodic_executor.h index bc4b9ca0e2d..420b5afde82 100644 --- a/yt/yt/core/concurrency/periodic_executor.h +++ b/yt/yt/core/concurrency/periodic_executor.h @@ -1,10 +1,7 @@ #pragma once #include "public.h" -#include "delayed_executor.h" - -#include <yt/yt/core/actions/callback.h> -#include <yt/yt/core/actions/future.h> +#include "recurring_executor_base.h" namespace NYT::NConcurrency { @@ -25,7 +22,7 @@ struct TPeriodicExecutorOptions //! Helps to perform certain actions periodically. class TPeriodicExecutor - : public TRefCounted + : public TRecurringExecutorBase { public: //! Initializes an instance. @@ -47,58 +44,20 @@ public: TClosure callback, std::optional<TDuration> period = {}); - //! Starts the instance. - //! The first invocation happens with a random delay within splay time. - void Start(); - - //! Stops the instance, cancels all subsequent invocations. - //! Returns a future that becomes set when all outstanding callback - //! invocations are finished and no more invocations are expected to happen. - TFuture<void> Stop(); - - //! Requests an immediate invocation. - void ScheduleOutOfBand(); - //! Changes execution period. void SetPeriod(std::optional<TDuration> period); - //! Returns the future that become set when - //! at least one action be fully executed from the moment of method call. - //! Cancellation of the returned future will not affect the action - //! or other futures returned by this method. - TFuture<void> GetExecutedEvent(); +protected: + void ScheduleFirstCallback() override; + void ScheduleCallback() override; + + TError MakeStoppedError() override; private: - const IInvokerPtr Invoker_; - const TClosure Callback_; std::optional<TDuration> Period_; const TDuration Splay_; const double Jitter_; - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - bool Started_ = false; - bool Busy_ = false; - bool OutOfBandRequested_ = false; - bool ExecutingCallback_ = false; - TCallback<void(const TError&)> ExecutionCanceler_; - TDelayedExecutorCookie Cookie_; - TPromise<void> IdlePromise_; - TPromise<void> ExecutedPromise_; - - void DoStop(TGuard<NThreading::TSpinLock>& guard); - - static TError MakeStoppedError(); - - void InitIdlePromise(); - void InitExecutedPromise(); - - void PostDelayedCallback(TDuration delay); - void PostCallback(); - - void OnTimer(bool aborted); - void OnCallbackSuccess(); - void OnCallbackInvocationFailed(); - TDuration NextDelay(); }; diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h index fe3ac26f34e..e066a287f0c 100644 --- a/yt/yt/core/concurrency/public.h +++ b/yt/yt/core/concurrency/public.h @@ -17,6 +17,7 @@ DECLARE_REFCOUNTED_STRUCT(IThreadPool) DECLARE_REFCOUNTED_STRUCT(ISuspendableActionQueue) DECLARE_REFCOUNTED_CLASS(TPeriodicExecutor) +DECLARE_REFCOUNTED_CLASS(TScheduledExecutor) DECLARE_REFCOUNTED_CLASS(TInvokerAlarm) DECLARE_REFCOUNTED_CLASS(TAsyncSemaphore) diff --git a/yt/yt/core/concurrency/recurring_executor_base.cpp b/yt/yt/core/concurrency/recurring_executor_base.cpp new file mode 100644 index 00000000000..912355c0ec4 --- /dev/null +++ b/yt/yt/core/concurrency/recurring_executor_base.cpp @@ -0,0 +1,269 @@ +#include "recurring_executor_base.h" +#include "scheduler.h" + +#include <yt/yt/core/actions/bind.h> + +#include <yt/yt/core/concurrency/thread_affinity.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +TRecurringExecutorBase::TRecurringExecutorBase( + IInvokerPtr invoker, + TClosure callback) + : Invoker_(std::move(invoker)) + , Callback_(std::move(callback)) +{ + YT_VERIFY(Invoker_); + YT_VERIFY(Callback_); +} + +void TRecurringExecutorBase::Start() +{ + auto guard = Guard(SpinLock_); + + if (Started_) { + return; + } + + ExecutedPromise_ = TPromise<void>(); + IdlePromise_ = TPromise<void>(); + Started_ = true; + ScheduleFirstCallback(); +} + +void TRecurringExecutorBase::DoStop(TGuard<NThreading::TSpinLock>& guard) +{ + if (!Started_) { + return; + } + + Started_ = false; + OutOfBandRequested_ = false; + auto executedPromise = ExecutedPromise_; + auto executionCanceler = ExecutionCanceler_; + TDelayedExecutor::CancelAndClear(Cookie_); + + guard.Release(); + + if (executedPromise) { + executedPromise.TrySet(MakeStoppedError()); + } + + if (executionCanceler) { + executionCanceler(MakeStoppedError()); + } +} + +TFuture<void> TRecurringExecutorBase::Stop() +{ + auto guard = Guard(SpinLock_); + if (IsExecutingCallback()) { + InitIdlePromise(); + auto idlePromise = IdlePromise_; + DoStop(guard); + return idlePromise; + } else { + DoStop(guard); + return VoidFuture; + } +} + +TFuture<void> TRecurringExecutorBase::GetExecutedEvent() +{ + auto guard = Guard(SpinLock_); + InitExecutedPromise(); + return ExecutedPromise_.ToFuture().ToUncancelable(); +} + +void TRecurringExecutorBase::InitIdlePromise() +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + if (IdlePromise_) { + return; + } + + if (Started_) { + IdlePromise_ = NewPromise<void>(); + } else { + IdlePromise_ = MakePromise<void>(TError()); + } +} + +void TRecurringExecutorBase::InitExecutedPromise() +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + if (ExecutedPromise_) { + return; + } + + if (Started_) { + ExecutedPromise_ = NewPromise<void>(); + } else { + ExecutedPromise_ = MakePromise<void>(MakeStoppedError()); + } +} + +void TRecurringExecutorBase::ScheduleOutOfBand() +{ + auto guard = Guard(SpinLock_); + if (!Started_) { + return; + } + + if (Busy_) { + OutOfBandRequested_ = true; + } else { + guard.Release(); + PostCallback(); + } +} + +void TRecurringExecutorBase::PostCallbackWithDeadline(TInstant deadline) +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + TDelayedExecutor::CancelAndClear(Cookie_); + Cookie_ = TDelayedExecutor::Submit( + BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnTimer, MakeWeak(this)), + deadline, + GetSyncInvoker()); +} + +void TRecurringExecutorBase::PostDelayedCallback(TDuration delay) +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + TDelayedExecutor::CancelAndClear(Cookie_); + Cookie_ = TDelayedExecutor::Submit( + BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnTimer, MakeWeak(this)), + delay, + GetSyncInvoker()); +} + +void TRecurringExecutorBase::PostCallback() +{ + auto this_ = MakeWeak(this); + GuardedInvoke( + Invoker_, + BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnCallbackSuccess, this_), + BIND_NO_PROPAGATE(&TRecurringExecutorBase::OnCallbackInvocationFailed, this_)); +} + +void TRecurringExecutorBase::OnTimer(bool aborted) +{ + if (aborted) { + return; + } + PostCallback(); +} + +void TRecurringExecutorBase::OnCallbackSuccess() +{ + TPromise<void> executedPromise; + { + auto guard = Guard(SpinLock_); + if (!Started_ || Busy_) { + return; + } + Busy_ = true; + ExecutingCallback_ = true; + ExecutionCanceler_ = GetCurrentFiberCanceler(); + TDelayedExecutor::CancelAndClear(Cookie_); + if (ExecutedPromise_) { + executedPromise = ExecutedPromise_; + ExecutedPromise_ = TPromise<void>(); + } + if (IdlePromise_) { + IdlePromise_ = NewPromise<void>(); + } + } + + auto cleanup = [=, this] (bool aborted) { + if (aborted) { + return; + } + + TPromise<void> idlePromise; + { + auto guard = Guard(SpinLock_); + idlePromise = IdlePromise_; + ExecutingCallback_ = false; + ExecutionCanceler_.Reset(); + } + + if (idlePromise) { + idlePromise.TrySet(); + } + + if (executedPromise) { + executedPromise.TrySet(); + } + + auto guard = Guard(SpinLock_); + + YT_VERIFY(Busy_); + Busy_ = false; + + if (!Started_) { + return; + } + if (OutOfBandRequested_) { + OutOfBandRequested_ = false; + guard.Release(); + PostCallback(); + } else { + ScheduleCallback(); + } + }; + + try { + Callback_(); + } catch (const TFiberCanceledException&) { + // There's very little we can do here safely; + // in particular, we should refrain from setting promises; + // let's forward the call to the delayed executor. + TDelayedExecutor::Submit( + BIND([this_ = MakeStrong(this), cleanup = std::move(cleanup)] (bool aborted) { + cleanup(aborted); + }), + TDuration::Zero()); + throw; + } + + cleanup(false); +} + +void TRecurringExecutorBase::OnCallbackInvocationFailed() +{ + auto guard = Guard(SpinLock_); + + if (!Started_) { + return; + } + + ScheduleCallback(); +} + +void TRecurringExecutorBase::KickStartInvocationIfNeeded() +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + if (Started_ && !Busy_) { + ScheduleFirstCallback(); + } +} + +bool TRecurringExecutorBase::IsExecutingCallback() const +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + return ExecutingCallback_; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/recurring_executor_base.h b/yt/yt/core/concurrency/recurring_executor_base.h new file mode 100644 index 00000000000..76d5ed24e2b --- /dev/null +++ b/yt/yt/core/concurrency/recurring_executor_base.h @@ -0,0 +1,90 @@ +#pragma once + +#include "public.h" +#include "delayed_executor.h" + +#include <yt/yt/core/actions/callback.h> +#include <yt/yt/core/actions/future.h> + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +//! Abstract base class providing possibility to perform actions with certain schedule. +class TRecurringExecutorBase + : public TRefCounted +{ +public: + //! Initializes an instance. + /*! + * \note + * We must call #Start to activate the instance. + * + * \param invoker Invoker used for wrapping actions. + * \param callback Callback to invoke according to the schedule. + */ + TRecurringExecutorBase( + IInvokerPtr invoker, + TClosure callback); + + //! Starts the instance. + //! The first invocation happens with a delay defined by the class heir. + void Start(); + + //! Stops the instance, cancels all subsequent invocations. + //! Returns a future that becomes set when all outstanding callback + //! invocations are finished and no more invocations are expected to happen. + TFuture<void> Stop(); + + //! Requests an immediate invocation. + void ScheduleOutOfBand(); + + //! Returns the future that become set when + //! at least one action be fully executed from the moment of method call. + //! Cancellation of the returned future will not affect the action + //! or other futures returned by this method. + TFuture<void> GetExecutedEvent(); + +private: + const IInvokerPtr Invoker_; + const TClosure Callback_; + + bool Started_ = false; + bool Busy_ = false; + bool OutOfBandRequested_ = false; + bool ExecutingCallback_ = false; + TCallback<void(const TError&)> ExecutionCanceler_; + TDelayedExecutorCookie Cookie_; + TPromise<void> IdlePromise_; + TPromise<void> ExecutedPromise_; + + void InitIdlePromise(); + void InitExecutedPromise(); + + void OnTimer(bool aborted); + void OnCallbackSuccess(); + void OnCallbackInvocationFailed(); + + void DoStop(TGuard<NThreading::TSpinLock>& guard); + +protected: + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); + + void PostCallbackWithDeadline(TInstant deadline); + void PostDelayedCallback(TDuration delay); + void PostCallback(); + + void KickStartInvocationIfNeeded(); + + bool IsExecutingCallback() const; + + virtual void ScheduleFirstCallback() = 0; + virtual void ScheduleCallback() = 0; + + virtual TError MakeStoppedError() = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency + diff --git a/yt/yt/core/concurrency/scheduled_executor.cpp b/yt/yt/core/concurrency/scheduled_executor.cpp new file mode 100644 index 00000000000..5eef530e4dd --- /dev/null +++ b/yt/yt/core/concurrency/scheduled_executor.cpp @@ -0,0 +1,63 @@ +#include "scheduled_executor.h" + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +TScheduledExecutor::TScheduledExecutor( + IInvokerPtr invoker, + TClosure callback, + std::optional<TDuration> interval) + : TRecurringExecutorBase(std::move(invoker), std::move(callback)) + , Interval_(interval) +{ + YT_VERIFY(!Interval_ || Interval_ != TDuration::Zero()); +} + +void TScheduledExecutor::SetInterval(std::optional<TDuration> interval) +{ + YT_VERIFY(!interval || interval != TDuration::Zero()); + + auto guard = Guard(SpinLock_); + + auto oldInterval = Interval_; + Interval_ = interval; + + if (interval && !(oldInterval && oldInterval->GetValue() % oldInterval->GetValue() == 0)) { + KickStartInvocationIfNeeded(); + } +} + +void TScheduledExecutor::ScheduleFirstCallback() +{ + ScheduleCallback(); +} + +void TScheduledExecutor::ScheduleCallback() +{ + if (Interval_) { + PostCallbackWithDeadline(NextDeadline()); + } +} + +TError TScheduledExecutor::MakeStoppedError() +{ + return TError(NYT::EErrorCode::Canceled, "Interval executor is stopped"); +} + +TInstant TScheduledExecutor::NextDeadline() +{ + VERIFY_SPINLOCK_AFFINITY(SpinLock_); + + YT_VERIFY(Interval_); + + // TInstant and TDuration are guaranteed to have same precision. + const auto& intervalValue = Interval_->GetValue(); + const auto& nowValue = TInstant::Now().GetValue(); + + return TInstant::FromValue(nowValue + (intervalValue - nowValue % intervalValue)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/scheduled_executor.h b/yt/yt/core/concurrency/scheduled_executor.h new file mode 100644 index 00000000000..bc35208d53c --- /dev/null +++ b/yt/yt/core/concurrency/scheduled_executor.h @@ -0,0 +1,52 @@ +#pragma once + +#include "public.h" +#include "recurring_executor_base.h" + +namespace NYT::NConcurrency { + +//////////////////////////////////////////////////////////////////////////////// + +//! Invokes callbacks according to a primitive schedule. +//! Given a set non-zero interval, callbacks will be executed at times, which are a multiple of this interval. +//! E.g. if the interval is 5 minutes, callbacks will be executed at 00:00, 00:05, 00:10, etc. +class TScheduledExecutor + : public TRecurringExecutorBase +{ +public: + //! Initializes an instance. + /*! + * \note + * We must call #Start to activate the instance. + * + * \param invoker Invoker used for wrapping actions. + * \param callback Callback to invoke periodically. + * \param interval Determines the moments of time at which the callback will be executed. + */ + TScheduledExecutor( + IInvokerPtr invoker, + TClosure callback, + std::optional<TDuration> interval); + + void SetInterval(std::optional<TDuration> interval); + +protected: + void ScheduleFirstCallback() override; + void ScheduleCallback() override; + + TError MakeStoppedError() override; + +private: + std::optional<TDuration> Interval_; + + //! Returns the next time instant which is a multiple of the configured interval. + //! NB: If the current instant is itself a multiple of the configured interval, this method will return the next + //! suitable instant. + TInstant NextDeadline(); +}; + +DEFINE_REFCOUNTED_TYPE(TScheduledExecutor) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/periodic_ut.cpp b/yt/yt/core/concurrency/unittests/periodic_ut.cpp index c5309d1175e..d033ed3f934 100644 --- a/yt/yt/core/concurrency/unittests/periodic_ut.cpp +++ b/yt/yt/core/concurrency/unittests/periodic_ut.cpp @@ -29,6 +29,7 @@ class TPeriodicTest TEST_W(TPeriodicTest, Simple) { std::atomic<int> count = {0}; + auto callback = BIND([&] () { TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(200)); ++count; @@ -61,11 +62,38 @@ TEST_W(TPeriodicTest, Simple) .ThrowOnError(); } -// TODO(achulkov2): Add simple out of band execution test. +TEST_W(TPeriodicTest, SimpleScheduleOutOfBand) +{ + std::atomic<int> count = {0}; + + auto callback = BIND([&] () { + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TPeriodicExecutor>( + actionQueue->GetInvoker(), + callback, + TDuration::MilliSeconds(300)); + + executor->Start(); + // Wait for first invocation. + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(20)); + + auto future = executor->GetExecutedEvent(); + const auto& now = TInstant::Now(); + executor->ScheduleOutOfBand(); + WaitFor(future) + .ThrowOnError(); + auto executionDuration = TInstant::Now() - now; + EXPECT_LT(executionDuration, TDuration::MilliSeconds(20)); + EXPECT_EQ(2, count.load()); +} TEST_W(TPeriodicTest, ParallelStop) { std::atomic<int> count = {0}; + auto callback = BIND([&] () { ++count; TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500)); @@ -108,6 +136,7 @@ TEST_W(TPeriodicTest, ParallelOnExecuted1) TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500)); ++count; }); + auto actionQueue = New<TActionQueue>(); auto executor = New<TPeriodicExecutor>( actionQueue->GetInvoker(), @@ -144,6 +173,7 @@ TEST_W(TPeriodicTest, ParallelOnExecuted2) TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); ++count; }); + auto actionQueue = New<TActionQueue>(); auto executor = New<TPeriodicExecutor>( actionQueue->GetInvoker(), @@ -180,6 +210,7 @@ TEST_W(TPeriodicTest, OnExecutedEventCanceled) TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); ++count; }); + auto actionQueue = New<TActionQueue>(); auto executor = New<TPeriodicExecutor>( actionQueue->GetInvoker(), @@ -206,9 +237,11 @@ TEST_W(TPeriodicTest, Stop) { auto neverSetPromise = NewPromise<void>(); auto immediatelyCancelableFuture = neverSetPromise.ToFuture().ToImmediatelyCancelable(); + auto callback = BIND([&] { WaitUntilSet(immediatelyCancelableFuture); }); + auto actionQueue = New<TActionQueue>(); auto executor = New<TPeriodicExecutor>( actionQueue->GetInvoker(), diff --git a/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp new file mode 100644 index 00000000000..c225667c3a3 --- /dev/null +++ b/yt/yt/core/concurrency/unittests/scheduled_executor_ut.cpp @@ -0,0 +1,263 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <yt/yt/core/actions/invoker_util.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/delayed_executor.h> +#include <yt/yt/core/concurrency/scheduled_executor.h> + +#include <atomic> + +namespace NYT::NConcurrency { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +class TScheduledExecutorTest + : public ::testing::Test +{ }; + +//////////////////////////////////////////////////////////////////////////////// + +void CheckTimeSlotCorrectness(const TDuration& interval) +{ + const auto& now = TInstant::Now(); + auto lastTick = TInstant::FromValue((now.GetValue() / interval.GetValue()) * interval.GetValue()); + YT_VERIFY(now - lastTick <= TDuration::MilliSeconds(10)); +} + +TEST_W(TScheduledExecutorTest, Simple) +{ + const auto& interval = TDuration::MilliSeconds(200); + std::atomic<int> count = {0}; + + auto callback = BIND([&] () { + CheckTimeSlotCorrectness(interval); + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300)); + WaitFor(executor->Stop()) + .ThrowOnError(); + EXPECT_LE(1, count.load()); + EXPECT_GE(2, count.load()); +} + +TEST_W(TScheduledExecutorTest, SimpleScheduleOutOfBand) +{ + std::atomic<int> count = {0}; + + auto callback = BIND([&] () { + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + TDuration::MilliSeconds(300)); + + executor->Start(); + TDuration executionDuration; + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + const auto& now = TInstant::Now(); + executor->ScheduleOutOfBand(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2}))) + .ThrowOnError(); + executionDuration = TInstant::Now() - now; + } + EXPECT_EQ(1, count.load()); + EXPECT_GT(TDuration::MilliSeconds(20), executionDuration); +} + +TEST_W(TScheduledExecutorTest, ParallelStop) +{ + const auto& interval = TDuration::MilliSeconds(10); + std::atomic<int> count = {0}; + + auto callback = BIND([&] () { + CheckTimeSlotCorrectness(interval); + ++count; + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500)); + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300)); + { + auto future1 = executor->Stop(); + auto future2 = executor->Stop(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2}))) + .ThrowOnError(); + } + EXPECT_EQ(1, count.load()); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300)); + { + auto future1 = executor->Stop(); + auto future2 = executor->Stop(); + auto future3 = executor->Stop(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3}))) + .ThrowOnError(); + } + EXPECT_EQ(2, count.load()); +} + +TEST_W(TScheduledExecutorTest, ParallelOnExecuted1) +{ + const auto& interval = TDuration::MilliSeconds(10); + std::atomic<int> count = 0; + + auto callback = BIND([&] () { + CheckTimeSlotCorrectness(interval); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(500)); + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(300)); + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2}))) + .ThrowOnError(); + } + EXPECT_EQ(2, count.load()); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(450)); + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + auto future3 = executor->GetExecutedEvent(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3}))) + .ThrowOnError(); + } + EXPECT_EQ(4, count.load()); +} + +TEST_W(TScheduledExecutorTest, ParallelOnExecuted2) +{ + const auto& interval = TDuration::MilliSeconds(400); + std::atomic<int> count = 0; + + auto callback = BIND([&] () { + CheckTimeSlotCorrectness(interval); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(400)); + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2}))) + .ThrowOnError(); + } + EXPECT_EQ(2, count.load()); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + auto future3 = executor->GetExecutedEvent(); + WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2, future3}))) + .ThrowOnError(); + } + EXPECT_EQ(3, count.load()); +} + +TEST_W(TScheduledExecutorTest, OnExecutedEventCanceled) +{ + const auto& interval = TDuration::MilliSeconds(50); + std::atomic<int> count = 0; + + auto callback = BIND([&] () { + CheckTimeSlotCorrectness(interval); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(200)); + ++count; + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + TDelayedExecutor::WaitForDuration(TDuration::MilliSeconds(100)); + { + auto future1 = executor->GetExecutedEvent(); + auto future2 = executor->GetExecutedEvent(); + + // Cancellation of the executed event future must not propagate to the underlying event. + auto future3 = executor->GetExecutedEvent(); + future3.Cancel(TError(NYT::EErrorCode::Canceled, "Canceled")); + + EXPECT_NO_THROW(WaitFor(AllSucceeded(std::vector<TFuture<void>>({future1, future2}))) + .ThrowOnError()); + } + EXPECT_EQ(2, count.load()); +} + +TEST_W(TScheduledExecutorTest, Stop) +{ + const auto& interval = TDuration::MilliSeconds(20); + auto neverSetPromise = NewPromise<void>(); + auto immediatelyCancelableFuture = neverSetPromise.ToFuture().ToImmediatelyCancelable(); + + auto callback = BIND([&] { + CheckTimeSlotCorrectness(interval); + WaitUntilSet(immediatelyCancelableFuture); + }); + + auto actionQueue = New<TActionQueue>(); + auto executor = New<TScheduledExecutor>( + actionQueue->GetInvoker(), + callback, + interval); + + executor->Start(); + // Wait for the callback to enter WaitFor. + Sleep(TDuration::MilliSeconds(100)); + WaitFor(executor->Stop()) + .ThrowOnError(); + + EXPECT_TRUE(immediatelyCancelableFuture.IsSet()); + EXPECT_EQ(NYT::EErrorCode::Canceled, immediatelyCancelableFuture.Get().GetCode()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/unittests/ya.make b/yt/yt/core/concurrency/unittests/ya.make index b507752539a..af74851a821 100644 --- a/yt/yt/core/concurrency/unittests/ya.make +++ b/yt/yt/core/concurrency/unittests/ya.make @@ -28,6 +28,7 @@ SRCS( periodic_ut.cpp propagating_storage_ut.cpp quantized_executor_ut.cpp + scheduled_executor_ut.cpp scheduler_ut.cpp suspendable_action_queue_ut.cpp thread_affinity_ut.cpp diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index db4201c01cf..e985dd82aea 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -81,6 +81,8 @@ SRCS( concurrency/thread_pool.cpp concurrency/throughput_throttler.cpp concurrency/two_level_fair_share_thread_pool.cpp + concurrency/recurring_executor_base.cpp + concurrency/scheduled_executor.cpp crypto/config.cpp crypto/crypto.cpp |