diff options
author | babenko <babenko@yandex-team.com> | 2025-01-11 17:52:46 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-01-11 18:13:18 +0300 |
commit | 291b8c2ac8281e6511c2ecaf102514e9906b9829 (patch) | |
tree | fbc611a102830a6b481719aaf2d88545e0d0a34a | |
parent | 53a0b39ec41083775dd7ca2f9bbb4bea1fcd33f3 (diff) | |
download | ydb-291b8c2ac8281e6511c2ecaf102514e9906b9829.tar.gz |
YT-23914: Rewrite wait time observers
In particular, enable multiple observers per invoker. This is a prerequisite for running multiple node instances in multdaemon.
commit_hash:d7edfb57d945915770417a66dbcf0a3c34e5bc2e
-rw-r--r-- | yt/yt/core/actions/callback_list-inl.h (renamed from yt/yt/core/actions/signal-inl.h) | 33 | ||||
-rw-r--r-- | yt/yt/core/actions/callback_list.h | 179 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker.h | 10 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker_detail.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker_detail.h | 2 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker_util.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/actions/signal.h | 169 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_share_thread_pool.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/concurrency/invoker_queue.cpp | 27 | ||||
-rw-r--r-- | yt/yt/core/concurrency/invoker_queue.h | 5 | ||||
-rw-r--r-- | yt/yt/core/concurrency/new_fair_share_thread_pool.cpp | 37 | ||||
-rw-r--r-- | yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp | 34 | ||||
-rw-r--r-- | yt/yt/core/concurrency/two_level_fair_share_thread_pool.h | 9 |
13 files changed, 297 insertions, 233 deletions
diff --git a/yt/yt/core/actions/signal-inl.h b/yt/yt/core/actions/callback_list-inl.h index 337e760df8..8e607a43a3 100644 --- a/yt/yt/core/actions/signal-inl.h +++ b/yt/yt/core/actions/callback_list-inl.h @@ -1,11 +1,9 @@ -#ifndef SIGNAL_INL_H_ -#error "Direct inclusion of this file is not allowed, include signal.h" +#ifndef CALLBACK_LIST_INL_H_ +#error "Direct inclusion of this file is not allowed, include callback_list.h" // For the sake of sane code completion. -#include "signal.h" +#include "callback_list.h" #endif -#undef SIGNAL_INL_H_ - -#include <library/cpp/yt/memory/ref.h> +#undef CALLBACK_LIST_INL_H_ namespace NYT { @@ -16,6 +14,7 @@ void TCallbackList<TResult(TArgs...)>::Subscribe(const TCallback& callback) { auto guard = WriterGuard(SpinLock_); Callbacks_.push_back(callback); + Empty_.store(false); } template <class TResult, class... TArgs> @@ -28,11 +27,15 @@ void TCallbackList<TResult(TArgs...)>::Unsubscribe(const TCallback& callback) break; } } + Empty_.store(Callbacks_.empty(), std::memory_order::release); } template <class TResult, class... TArgs> std::vector<TCallback<TResult(TArgs...)>> TCallbackList<TResult(TArgs...)>::ToVector() const { + if (IsEmpty()) [[likely]] { + return {}; + } auto guard = ReaderGuard(SpinLock_); return std::vector<TCallback>(Callbacks_.begin(), Callbacks_.end()); } @@ -40,15 +43,17 @@ std::vector<TCallback<TResult(TArgs...)>> TCallbackList<TResult(TArgs...)>::ToVe template <class TResult, class... TArgs> int TCallbackList<TResult(TArgs...)>::Size() const { + if (IsEmpty()) [[likely]] { + return 0; + } auto guard = ReaderGuard(SpinLock_); return Callbacks_.size(); } template <class TResult, class... TArgs> -bool TCallbackList<TResult(TArgs...)>::Empty() const +bool TCallbackList<TResult(TArgs...)>::IsEmpty() const { - auto guard = ReaderGuard(SpinLock_); - return Callbacks_.empty(); + return Empty_.load(std::memory_order::acquire); } template <class TResult, class... TArgs> @@ -56,12 +61,17 @@ void TCallbackList<TResult(TArgs...)>::Clear() { auto guard = WriterGuard(SpinLock_); Callbacks_.clear(); + Empty_.store(true, std::memory_order::release); } template <class TResult, class... TArgs> template <class... TCallArgs> void TCallbackList<TResult(TArgs...)>::Fire(TCallArgs&&... args) const { + if (IsEmpty()) [[likely]] { + return; + } + TCallbackVector callbacks; { auto guard = ReaderGuard(SpinLock_); @@ -77,10 +87,15 @@ template <class TResult, class... TArgs> template <class... TCallArgs> void TCallbackList<TResult(TArgs...)>::FireAndClear(TCallArgs&&... args) { + if (IsEmpty()) [[likely]] { + return; + } + TCallbackVector callbacks; { auto guard = WriterGuard(SpinLock_); callbacks.swap(Callbacks_); + Empty_.store(true, std::memory_order::release); } for (const auto& callback : callbacks) { diff --git a/yt/yt/core/actions/callback_list.h b/yt/yt/core/actions/callback_list.h new file mode 100644 index 0000000000..3de80024ab --- /dev/null +++ b/yt/yt/core/actions/callback_list.h @@ -0,0 +1,179 @@ +#pragma once + +#include "callback.h" + +#include <library/cpp/yt/small_containers/compact_vector.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +/*! + * A client may subscribe to a list (adding a new handler to it), + * unsubscribe from it (removing an earlier added handler), + * and fire it thus invoking the callbacks added so far. + * + * Thread affinity: any. + */ +template <class TSignature> +class TCallbackList +{ }; + +template <class TResult, class... TArgs> +class TCallbackList<TResult(TArgs...)> +{ +public: + using TCallback = NYT::TCallback<TResult(TArgs...)>; + + //! Adds a new handler to the list. + /*! + * \param callback A handler to be added. + */ + void Subscribe(const TCallback& callback); + + //! Removes a handler from the list. + /*! + * \param callback A handler to be removed. + */ + void Unsubscribe(const TCallback& callback); + + //! Returns the vector of currently added callbacks. + std::vector<TCallback> ToVector() const; + + //! Returns the number of handlers. + int Size() const; + + //! Returns |true| if there are no handlers. + bool IsEmpty() const; + + //! Clears the list of handlers. + void Clear(); + + //! Runs all handlers in the list. + //! The return values (if any) are ignored. + template <class... TCallArgs> + void Fire(TCallArgs&&... args) const; + + //! Runs all handlers in the list and clears the list. + //! The return values (if any) are ignored. + template <class... TCallArgs> + void FireAndClear(TCallArgs&&... args); + +private: + std::atomic<bool> Empty_ = true; + + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_); + using TCallbackVector = TCompactVector<TCallback, 4>; + TCallbackVector Callbacks_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/*! + * Similar to TCallbackList, but single-threaded and copyable. + * + * Cannot be used from multiple threads. + */ +template <class TSignature> +class TSimpleCallbackList +{ }; + +template <class TResult, class... TArgs> +class TSimpleCallbackList<TResult(TArgs...)> +{ +public: + using TCallback = NYT::TCallback<TResult(TArgs...)>; + + //! Adds a new handler to the list. + /*! + * \param callback A handler to be added. + */ + void Subscribe(const TCallback& callback); + + //! Removes a handler from the list. + /*! + * \param callback A handler to be removed. + */ + void Unsubscribe(const TCallback& callback); + + //! Runs all handlers in the list. + //! The return values (if any) are ignored. + template <class... TCallArgs> + void Fire(TCallArgs&&... args) const; + +private: + using TCallbackVector = TCompactVector<TCallback, 4>; + TCallbackVector Callbacks_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +/*! + * Similar to TCallbackList but can only be fired once. + * When fired, captures the arguments and in subsequent calls + * to Subscribe instantly invokes the subscribers. + * + * Thread affinity: any. + */ +template <class TSignature> +class TSingleShotCallbackList +{ }; + +template <class TResult, class... TArgs> +class TSingleShotCallbackList<TResult(TArgs...)> +{ +public: + using TCallback = NYT::TCallback<TResult(TArgs...)>; + + //! Adds a new handler to the list. + /*! + * If the list was already fired then #callback is invoked in situ. + * \param callback A handler to be added. + */ + void Subscribe(const TCallback& callback); + + //! Tries to add a new handler to the list. + /*! + * If the list was already fired then returns |false|. + * Otherwise atomically installs the handler. + * \param callback A handler to be added. + */ + bool TrySubscribe(const TCallback& callback); + + //! Removes a handler from the list. + /*! + * \param callback A handler to be removed. + */ + void Unsubscribe(const TCallback& callback); + + //! Returns the vector of currently added callbacks. + std::vector<TCallback> ToVector() const; + + //! Runs all handlers in the list. + //! The return values (if any) are ignored. + /*! + * \returns |true| if this is the first attempt to fire the list. + */ + template <class... TCallArgs> + bool Fire(TCallArgs&&... args); + + //! \returns |true| if the list was fired. + bool IsFired() const; + +private: + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_); + std::atomic<bool> Fired_ = false; + using TCallbackVector = TCompactVector<TCallback, 4>; + TCallbackVector Callbacks_; + std::tuple<typename std::decay<TArgs>::type...> Args_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define CALLBACK_LIST_INL_H_ +#include "callback_list-inl.h" +#undef CALLBACK_LIST_INL_H_ diff --git a/yt/yt/core/actions/invoker.h b/yt/yt/core/actions/invoker.h index 21128f0c4f..ca0d33ccae 100644 --- a/yt/yt/core/actions/invoker.h +++ b/yt/yt/core/actions/invoker.h @@ -1,8 +1,10 @@ #pragma once #include "callback.h" +#include "signal.h" #include <yt/yt/core/threading/public.h> + #include <library/cpp/yt/memory/range.h> #include <type_traits> @@ -38,11 +40,11 @@ struct IInvoker */ virtual bool IsSerialized() const = 0; - using TWaitTimeObserver = std::function<void(TDuration waitTime)>; - //! Registers a callback that could be invoked to inform - //! of the current wait time for invocations via this invoker. + //! Invoked to inform of the current wait time for invocations via this invoker. //! These invocations, however, are not guaranteed. - virtual void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) = 0; + using TWaitTimeObserver = TCallback<void(TDuration waitTime)>; + + DECLARE_INTERFACE_SIGNAL(TWaitTimeObserver::TSignature, WaitTimeObserved); }; DEFINE_REFCOUNTED_TYPE(IInvoker) diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp index d33514ae46..fc0fd7b308 100644 --- a/yt/yt/core/actions/invoker_detail.cpp +++ b/yt/yt/core/actions/invoker_detail.cpp @@ -46,9 +46,15 @@ bool TInvokerWrapper<VirtualizeBase>::IsSerialized() const } template <bool VirtualizeBase> -void TInvokerWrapper<VirtualizeBase>::RegisterWaitTimeObserver(IInvoker::TWaitTimeObserver waitTimeObserver) +void TInvokerWrapper<VirtualizeBase>::SubscribeWaitTimeObserved(const IInvoker::TWaitTimeObserver& callback) { - return UnderlyingInvoker_->RegisterWaitTimeObserver(waitTimeObserver); + return UnderlyingInvoker_->SubscribeWaitTimeObserved(callback); +} + +template <bool VirtualizeBase> +void TInvokerWrapper<VirtualizeBase>::UnsubscribeWaitTimeObserved(const IInvoker::TWaitTimeObserver& callback) +{ + return UnderlyingInvoker_->SubscribeWaitTimeObserved(callback); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h index c031ac0dc7..1f662dccd4 100644 --- a/yt/yt/core/actions/invoker_detail.h +++ b/yt/yt/core/actions/invoker_detail.h @@ -39,7 +39,7 @@ public: NThreading::TThreadId GetThreadId() const override; bool CheckAffinity(const IInvokerPtr& invoker) const override; bool IsSerialized() const override; - void RegisterWaitTimeObserver(IInvoker::TWaitTimeObserver waitTimeObserver) override; + DECLARE_SIGNAL_OVERRIDE(IInvoker::TWaitTimeObserver::TSignature, WaitTimeObserved); protected: const IInvokerPtr UnderlyingInvoker_; diff --git a/yt/yt/core/actions/invoker_util.cpp b/yt/yt/core/actions/invoker_util.cpp index a50fb6a1c4..a4f47afb24 100644 --- a/yt/yt/core/actions/invoker_util.cpp +++ b/yt/yt/core/actions/invoker_util.cpp @@ -83,7 +83,10 @@ public: return InvalidThreadId; } - void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override + { } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override { } private: @@ -128,7 +131,10 @@ public: return InvalidThreadId; } - void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override + { } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override { } }; diff --git a/yt/yt/core/actions/signal.h b/yt/yt/core/actions/signal.h index b46615c415..395249ad5d 100644 --- a/yt/yt/core/actions/signal.h +++ b/yt/yt/core/actions/signal.h @@ -1,175 +1,12 @@ #pragma once #include "callback.h" - -#include <library/cpp/yt/small_containers/compact_vector.h> - -#include <library/cpp/yt/threading/rw_spin_lock.h> +#include "callback_list.h" namespace NYT { //////////////////////////////////////////////////////////////////////////////// -/*! - * A client may subscribe to a list (adding a new handler to it), - * unsubscribe from it (removing an earlier added handler), - * and fire it thus invoking the callbacks added so far. - * - * Thread affinity: any. - */ -template <class TSignature> -class TCallbackList -{ }; - -template <class TResult, class... TArgs> -class TCallbackList<TResult(TArgs...)> -{ -public: - using TCallback = NYT::TCallback<TResult(TArgs...)>; - - //! Adds a new handler to the list. - /*! - * \param callback A handler to be added. - */ - void Subscribe(const TCallback& callback); - - //! Removes a handler from the list. - /*! - * \param callback A handler to be removed. - */ - void Unsubscribe(const TCallback& callback); - - //! Returns the vector of currently added callbacks. - std::vector<TCallback> ToVector() const; - - //! Returns the number of handlers. - int Size() const; - - //! Returns |true| if there are no handlers. - bool Empty() const; - - //! Clears the list of handlers. - void Clear(); - - //! Runs all handlers in the list. - //! The return values (if any) are ignored. - template <class... TCallArgs> - void Fire(TCallArgs&&... args) const; - - //! Runs all handlers in the list and clears the list. - //! The return values (if any) are ignored. - template <class... TCallArgs> - void FireAndClear(TCallArgs&&... args); - -private: - YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_); - using TCallbackVector = TCompactVector<TCallback, 4>; - TCallbackVector Callbacks_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -/*! - * Similar to TCallbackList, but single-threaded and copyable. - * - * Cannot be used from multiple threads. - */ -template <class TSignature> -class TSimpleCallbackList -{ }; - -template <class TResult, class... TArgs> -class TSimpleCallbackList<TResult(TArgs...)> -{ -public: - using TCallback = NYT::TCallback<TResult(TArgs...)>; - - //! Adds a new handler to the list. - /*! - * \param callback A handler to be added. - */ - void Subscribe(const TCallback& callback); - - //! Removes a handler from the list. - /*! - * \param callback A handler to be removed. - */ - void Unsubscribe(const TCallback& callback); - - //! Runs all handlers in the list. - //! The return values (if any) are ignored. - template <class... TCallArgs> - void Fire(TCallArgs&&... args) const; - -private: - using TCallbackVector = TCompactVector<TCallback, 4>; - TCallbackVector Callbacks_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -/*! - * Similar to TCallbackList but can only be fired once. - * When fired, captures the arguments and in subsequent calls - * to Subscribe instantly invokes the subscribers. - * - * Thread affinity: any. - */ -template <class TSignature> -class TSingleShotCallbackList -{ }; - -template <class TResult, class... TArgs> -class TSingleShotCallbackList<TResult(TArgs...)> -{ -public: - using TCallback = NYT::TCallback<TResult(TArgs...)>; - - //! Adds a new handler to the list. - /*! - * If the list was already fired then #callback is invoked in situ. - * \param callback A handler to be added. - */ - void Subscribe(const TCallback& callback); - - //! Tries to add a new handler to the list. - /*! - * If the list was already fired then returns |false|. - * Otherwise atomically installs the handler. - * \param callback A handler to be added. - */ - bool TrySubscribe(const TCallback& callback); - - //! Removes a handler from the list. - /*! - * \param callback A handler to be removed. - */ - void Unsubscribe(const TCallback& callback); - - //! Returns the vector of currently added callbacks. - std::vector<TCallback> ToVector() const; - - //! Runs all handlers in the list. - //! The return values (if any) are ignored. - /*! - * \returns |true| if this is the first attempt to fire the list. - */ - template <class... TCallArgs> - bool Fire(TCallArgs&&... args); - - //! \returns |true| if the list was fired. - bool IsFired() const; - -private: - YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_); - std::atomic<bool> Fired_ = false; - using TCallbackVector = TCompactVector<TCallback, 4>; - TCallbackVector Callbacks_; - std::tuple<typename std::decay<TArgs>::type...> Args_; -}; - -//////////////////////////////////////////////////////////////////////////////// - #define DEFINE_SIGNAL(TSignature, name) \ protected: \ ::NYT::TCallbackList<TSignature> name##_; \ @@ -266,7 +103,3 @@ public: \ //////////////////////////////////////////////////////////////////////////////// } // namespace NYT - -#define SIGNAL_INL_H_ -#include "signal-inl.h" -#undef SIGNAL_INL_H_ diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp index a5a4d9285d..fb8dec13cf 100644 --- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp @@ -71,7 +71,10 @@ public: return false; } - void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override + { } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override { } ~TBucket(); diff --git a/yt/yt/core/concurrency/invoker_queue.cpp b/yt/yt/core/concurrency/invoker_queue.cpp index 019589501a..eb871622f7 100644 --- a/yt/yt/core/concurrency/invoker_queue.cpp +++ b/yt/yt/core/concurrency/invoker_queue.cpp @@ -269,10 +269,17 @@ public: } } - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override { if (auto queue = Queue_.Lock()) { - queue->RegisterWaitTimeObserver(waitTimeObserver); + queue->SubscribeWaitTimeObserved(callback); + } + } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override + { + if (auto queue = Queue_.Lock()) { + queue->UnsubscribeWaitTimeObserved(callback); } } @@ -597,9 +604,7 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T auto waitTime = CpuDurationToDuration(action->StartedAt - action->EnqueuedAt); - if (IsWaitTimeObserverSet_.load()) { - WaitTimeObserver_(waitTime); - } + WaitTimeObserved_.Fire(waitTime); if (Counters_[action->ProfilingTag]) { Counters_[action->ProfilingTag]->DequeuedCounter.Increment(); @@ -676,13 +681,15 @@ IInvoker* TInvokerQueue<TQueueImpl>::GetProfilingTagSettingInvoker(int profiling } template <class TQueueImpl> -void TInvokerQueue<TQueueImpl>::RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) +void TInvokerQueue<TQueueImpl>::SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) { - WaitTimeObserver_ = waitTimeObserver; - auto alreadyInitialized = IsWaitTimeObserverSet_.exchange(true); + WaitTimeObserved_.Subscribe(callback); +} - // Multiple observers are forbidden. - YT_VERIFY(!alreadyInitialized); +template <class TQueueImpl> +void TInvokerQueue<TQueueImpl>::UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) +{ + WaitTimeObserved_.Unsubscribe(callback); } template <class TQueueImpl> diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h index e4a978aeae..95719e28fa 100644 --- a/yt/yt/core/concurrency/invoker_queue.h +++ b/yt/yt/core/concurrency/invoker_queue.h @@ -192,7 +192,7 @@ public: IInvoker* GetProfilingTagSettingInvoker(int profilingTag); - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override; + DECLARE_SIGNAL_OVERRIDE(TWaitTimeObserver::TSignature, WaitTimeObserved); private: const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_; @@ -220,8 +220,7 @@ private: std::vector<IInvokerPtr> ProfilingTagSettingInvokers_; - std::atomic<bool> IsWaitTimeObserverSet_; - TWaitTimeObserver WaitTimeObserver_; + TCallbackList<TWaitTimeObserver::TSignature> WaitTimeObserved_; TCountersPtr CreateCounters(const NProfiling::TTagSet& tagSet, NProfiling::IRegistryPtr registry); diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index 0c8334f323..fec716623d 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -366,7 +366,10 @@ public: return invoker.Get() == this; } - void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override + { } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override { } private: @@ -788,13 +791,14 @@ public: } - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) + void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) { - WaitTimeObserver_ = waitTimeObserver; - auto alreadyInitialized = IsWaitTimeObserverSet_.exchange(true); + WaitTimeObservers_.Subscribe(callback); + } - // Multiple observers are forbidden. - YT_VERIFY(!alreadyInitialized); + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) + { + WaitTimeObservers_.Unsubscribe(callback); } private: @@ -840,8 +844,7 @@ private: std::atomic<int> ThreadCount_ = 0; std::atomic<int> ActiveThreads_ = 0; - std::atomic<bool> IsWaitTimeObserverSet_; - TWaitTimeObserver WaitTimeObserver_; + TCallbackList<TWaitTimeObserver::TSignature> WaitTimeObservers_; TProfiler GetPoolProfiler(const TString& poolName) override { @@ -1191,7 +1194,7 @@ private: if (action.BucketHolder) { auto waitTime = CpuDurationToDuration(action.StartedAt - action.EnqueuedAt); action.BucketHolder->Pool->WaitTimeCounter.Record(waitTime); - ReportWaitTime(waitTime); + WaitTimeObservers_.Fire(waitTime); } MaybeRunMaintenance(&threadState, action.StartedAt, /*flush*/ false); @@ -1241,13 +1244,6 @@ private: return std::move(threadState.Action.Callback); } - void ReportWaitTime(TDuration waitTime) - { - if (IsWaitTimeObserverSet_.load()) { - WaitTimeObserver_(waitTime); - } - } - static void MaybeRunMaintenance(TThreadState* threadState, TCpuInstant now, bool flush) { YT_ASSERT(threadState); @@ -1371,9 +1367,14 @@ public: return TThreadPoolBase::GetThreadCount(); } - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override + { + Queue_->SubscribeWaitTimeObserved(callback); + } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override { - Queue_->RegisterWaitTimeObserver(waitTimeObserver); + Queue_->UnsubscribeWaitTimeObserved(callback); } private: diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp index 9ca4c824d1..07897b4dc8 100644 --- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp @@ -74,7 +74,10 @@ struct TBucket return false; } - void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override + { } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override { } ~TBucket(); @@ -298,12 +301,9 @@ public: auto currentInstant = GetCpuInstant(); TBucketPtr bucket; - TWaitTimeObserver waitTimeObserver; - { auto guard = Guard(SpinLock_); bucket = GetStarvingBucket(action); - waitTimeObserver = WaitTimeObserver_; if (!bucket) { return false; @@ -318,8 +318,9 @@ public: bucket->WaitTime = action->StartedAt - action->EnqueuedAt; } - if (waitTimeObserver) { - waitTimeObserver(CpuDurationToDuration(action->StartedAt - action->EnqueuedAt)); + // Microoptimization: avoid calling CpuDurationToDuration if no observers are registered. + if (!WaitTimeObservers_.IsEmpty()) { + WaitTimeObservers_.Fire(CpuDurationToDuration(action->StartedAt - action->EnqueuedAt)); } YT_ASSERT(action && !action->Finished); @@ -393,10 +394,14 @@ public: } } - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) + void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) { - auto guard = Guard(SpinLock_); - WaitTimeObserver_ = waitTimeObserver; + WaitTimeObservers_.Subscribe(callback); + } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) + { + WaitTimeObservers_.Unsubscribe(callback); } private: @@ -467,7 +472,7 @@ private: std::atomic<int> ThreadCount_ = 0; std::array<TThreadState, TThreadPoolBase::MaxThreadCount> ThreadStates_; - ITwoLevelFairShareThreadPool::TWaitTimeObserver WaitTimeObserver_; + TCallbackList<ITwoLevelFairShareThreadPool::TWaitTimeObserver::TSignature> WaitTimeObservers_; size_t GetLowestEmptyPoolId() @@ -693,9 +698,14 @@ public: TThreadPoolBase::Shutdown(); } - void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override + void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override + { + Queue_->SubscribeWaitTimeObserved(callback); + } + + void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override { - Queue_->RegisterWaitTimeObserver(std::move(waitTimeObserver)); + Queue_->UnsubscribeWaitTimeObserved(callback); } private: diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h index b465279a66..57d616391b 100644 --- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h +++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h @@ -2,7 +2,7 @@ #include "public.h" -#include <yt/yt/core/actions/public.h> +#include <yt/yt/core/actions/signal.h> namespace NYT::NConcurrency { @@ -31,8 +31,11 @@ struct ITwoLevelFairShareThreadPool virtual void Shutdown() = 0; - using TWaitTimeObserver = std::function<void(TDuration)>; - virtual void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) = 0; + //! Invoked to inform of the current wait time for invocations via this invoker. + //! These invocations, however, are not guaranteed. + using TWaitTimeObserver = TCallback<void(TDuration waitTime)>; + + DECLARE_INTERFACE_SIGNAL(TWaitTimeObserver::TSignature, WaitTimeObserved); }; DEFINE_REFCOUNTED_TYPE(ITwoLevelFairShareThreadPool) |