aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-01-11 17:52:46 +0300
committerbabenko <babenko@yandex-team.com>2025-01-11 18:13:18 +0300
commit291b8c2ac8281e6511c2ecaf102514e9906b9829 (patch)
treefbc611a102830a6b481719aaf2d88545e0d0a34a
parent53a0b39ec41083775dd7ca2f9bbb4bea1fcd33f3 (diff)
downloadydb-291b8c2ac8281e6511c2ecaf102514e9906b9829.tar.gz
YT-23914: Rewrite wait time observers
In particular, enable multiple observers per invoker. This is a prerequisite for running multiple node instances in multdaemon. commit_hash:d7edfb57d945915770417a66dbcf0a3c34e5bc2e
-rw-r--r--yt/yt/core/actions/callback_list-inl.h (renamed from yt/yt/core/actions/signal-inl.h)33
-rw-r--r--yt/yt/core/actions/callback_list.h179
-rw-r--r--yt/yt/core/actions/invoker.h10
-rw-r--r--yt/yt/core/actions/invoker_detail.cpp10
-rw-r--r--yt/yt/core/actions/invoker_detail.h2
-rw-r--r--yt/yt/core/actions/invoker_util.cpp10
-rw-r--r--yt/yt/core/actions/signal.h169
-rw-r--r--yt/yt/core/concurrency/fair_share_thread_pool.cpp5
-rw-r--r--yt/yt/core/concurrency/invoker_queue.cpp27
-rw-r--r--yt/yt/core/concurrency/invoker_queue.h5
-rw-r--r--yt/yt/core/concurrency/new_fair_share_thread_pool.cpp37
-rw-r--r--yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp34
-rw-r--r--yt/yt/core/concurrency/two_level_fair_share_thread_pool.h9
13 files changed, 297 insertions, 233 deletions
diff --git a/yt/yt/core/actions/signal-inl.h b/yt/yt/core/actions/callback_list-inl.h
index 337e760df8..8e607a43a3 100644
--- a/yt/yt/core/actions/signal-inl.h
+++ b/yt/yt/core/actions/callback_list-inl.h
@@ -1,11 +1,9 @@
-#ifndef SIGNAL_INL_H_
-#error "Direct inclusion of this file is not allowed, include signal.h"
+#ifndef CALLBACK_LIST_INL_H_
+#error "Direct inclusion of this file is not allowed, include callback_list.h"
// For the sake of sane code completion.
-#include "signal.h"
+#include "callback_list.h"
#endif
-#undef SIGNAL_INL_H_
-
-#include <library/cpp/yt/memory/ref.h>
+#undef CALLBACK_LIST_INL_H_
namespace NYT {
@@ -16,6 +14,7 @@ void TCallbackList<TResult(TArgs...)>::Subscribe(const TCallback& callback)
{
auto guard = WriterGuard(SpinLock_);
Callbacks_.push_back(callback);
+ Empty_.store(false);
}
template <class TResult, class... TArgs>
@@ -28,11 +27,15 @@ void TCallbackList<TResult(TArgs...)>::Unsubscribe(const TCallback& callback)
break;
}
}
+ Empty_.store(Callbacks_.empty(), std::memory_order::release);
}
template <class TResult, class... TArgs>
std::vector<TCallback<TResult(TArgs...)>> TCallbackList<TResult(TArgs...)>::ToVector() const
{
+ if (IsEmpty()) [[likely]] {
+ return {};
+ }
auto guard = ReaderGuard(SpinLock_);
return std::vector<TCallback>(Callbacks_.begin(), Callbacks_.end());
}
@@ -40,15 +43,17 @@ std::vector<TCallback<TResult(TArgs...)>> TCallbackList<TResult(TArgs...)>::ToVe
template <class TResult, class... TArgs>
int TCallbackList<TResult(TArgs...)>::Size() const
{
+ if (IsEmpty()) [[likely]] {
+ return 0;
+ }
auto guard = ReaderGuard(SpinLock_);
return Callbacks_.size();
}
template <class TResult, class... TArgs>
-bool TCallbackList<TResult(TArgs...)>::Empty() const
+bool TCallbackList<TResult(TArgs...)>::IsEmpty() const
{
- auto guard = ReaderGuard(SpinLock_);
- return Callbacks_.empty();
+ return Empty_.load(std::memory_order::acquire);
}
template <class TResult, class... TArgs>
@@ -56,12 +61,17 @@ void TCallbackList<TResult(TArgs...)>::Clear()
{
auto guard = WriterGuard(SpinLock_);
Callbacks_.clear();
+ Empty_.store(true, std::memory_order::release);
}
template <class TResult, class... TArgs>
template <class... TCallArgs>
void TCallbackList<TResult(TArgs...)>::Fire(TCallArgs&&... args) const
{
+ if (IsEmpty()) [[likely]] {
+ return;
+ }
+
TCallbackVector callbacks;
{
auto guard = ReaderGuard(SpinLock_);
@@ -77,10 +87,15 @@ template <class TResult, class... TArgs>
template <class... TCallArgs>
void TCallbackList<TResult(TArgs...)>::FireAndClear(TCallArgs&&... args)
{
+ if (IsEmpty()) [[likely]] {
+ return;
+ }
+
TCallbackVector callbacks;
{
auto guard = WriterGuard(SpinLock_);
callbacks.swap(Callbacks_);
+ Empty_.store(true, std::memory_order::release);
}
for (const auto& callback : callbacks) {
diff --git a/yt/yt/core/actions/callback_list.h b/yt/yt/core/actions/callback_list.h
new file mode 100644
index 0000000000..3de80024ab
--- /dev/null
+++ b/yt/yt/core/actions/callback_list.h
@@ -0,0 +1,179 @@
+#pragma once
+
+#include "callback.h"
+
+#include <library/cpp/yt/small_containers/compact_vector.h>
+
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+/*!
+ * A client may subscribe to a list (adding a new handler to it),
+ * unsubscribe from it (removing an earlier added handler),
+ * and fire it thus invoking the callbacks added so far.
+ *
+ * Thread affinity: any.
+ */
+template <class TSignature>
+class TCallbackList
+{ };
+
+template <class TResult, class... TArgs>
+class TCallbackList<TResult(TArgs...)>
+{
+public:
+ using TCallback = NYT::TCallback<TResult(TArgs...)>;
+
+ //! Adds a new handler to the list.
+ /*!
+ * \param callback A handler to be added.
+ */
+ void Subscribe(const TCallback& callback);
+
+ //! Removes a handler from the list.
+ /*!
+ * \param callback A handler to be removed.
+ */
+ void Unsubscribe(const TCallback& callback);
+
+ //! Returns the vector of currently added callbacks.
+ std::vector<TCallback> ToVector() const;
+
+ //! Returns the number of handlers.
+ int Size() const;
+
+ //! Returns |true| if there are no handlers.
+ bool IsEmpty() const;
+
+ //! Clears the list of handlers.
+ void Clear();
+
+ //! Runs all handlers in the list.
+ //! The return values (if any) are ignored.
+ template <class... TCallArgs>
+ void Fire(TCallArgs&&... args) const;
+
+ //! Runs all handlers in the list and clears the list.
+ //! The return values (if any) are ignored.
+ template <class... TCallArgs>
+ void FireAndClear(TCallArgs&&... args);
+
+private:
+ std::atomic<bool> Empty_ = true;
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
+ using TCallbackVector = TCompactVector<TCallback, 4>;
+ TCallbackVector Callbacks_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/*!
+ * Similar to TCallbackList, but single-threaded and copyable.
+ *
+ * Cannot be used from multiple threads.
+ */
+template <class TSignature>
+class TSimpleCallbackList
+{ };
+
+template <class TResult, class... TArgs>
+class TSimpleCallbackList<TResult(TArgs...)>
+{
+public:
+ using TCallback = NYT::TCallback<TResult(TArgs...)>;
+
+ //! Adds a new handler to the list.
+ /*!
+ * \param callback A handler to be added.
+ */
+ void Subscribe(const TCallback& callback);
+
+ //! Removes a handler from the list.
+ /*!
+ * \param callback A handler to be removed.
+ */
+ void Unsubscribe(const TCallback& callback);
+
+ //! Runs all handlers in the list.
+ //! The return values (if any) are ignored.
+ template <class... TCallArgs>
+ void Fire(TCallArgs&&... args) const;
+
+private:
+ using TCallbackVector = TCompactVector<TCallback, 4>;
+ TCallbackVector Callbacks_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+/*!
+ * Similar to TCallbackList but can only be fired once.
+ * When fired, captures the arguments and in subsequent calls
+ * to Subscribe instantly invokes the subscribers.
+ *
+ * Thread affinity: any.
+ */
+template <class TSignature>
+class TSingleShotCallbackList
+{ };
+
+template <class TResult, class... TArgs>
+class TSingleShotCallbackList<TResult(TArgs...)>
+{
+public:
+ using TCallback = NYT::TCallback<TResult(TArgs...)>;
+
+ //! Adds a new handler to the list.
+ /*!
+ * If the list was already fired then #callback is invoked in situ.
+ * \param callback A handler to be added.
+ */
+ void Subscribe(const TCallback& callback);
+
+ //! Tries to add a new handler to the list.
+ /*!
+ * If the list was already fired then returns |false|.
+ * Otherwise atomically installs the handler.
+ * \param callback A handler to be added.
+ */
+ bool TrySubscribe(const TCallback& callback);
+
+ //! Removes a handler from the list.
+ /*!
+ * \param callback A handler to be removed.
+ */
+ void Unsubscribe(const TCallback& callback);
+
+ //! Returns the vector of currently added callbacks.
+ std::vector<TCallback> ToVector() const;
+
+ //! Runs all handlers in the list.
+ //! The return values (if any) are ignored.
+ /*!
+ * \returns |true| if this is the first attempt to fire the list.
+ */
+ template <class... TCallArgs>
+ bool Fire(TCallArgs&&... args);
+
+ //! \returns |true| if the list was fired.
+ bool IsFired() const;
+
+private:
+ YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
+ std::atomic<bool> Fired_ = false;
+ using TCallbackVector = TCompactVector<TCallback, 4>;
+ TCallbackVector Callbacks_;
+ std::tuple<typename std::decay<TArgs>::type...> Args_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define CALLBACK_LIST_INL_H_
+#include "callback_list-inl.h"
+#undef CALLBACK_LIST_INL_H_
diff --git a/yt/yt/core/actions/invoker.h b/yt/yt/core/actions/invoker.h
index 21128f0c4f..ca0d33ccae 100644
--- a/yt/yt/core/actions/invoker.h
+++ b/yt/yt/core/actions/invoker.h
@@ -1,8 +1,10 @@
#pragma once
#include "callback.h"
+#include "signal.h"
#include <yt/yt/core/threading/public.h>
+
#include <library/cpp/yt/memory/range.h>
#include <type_traits>
@@ -38,11 +40,11 @@ struct IInvoker
*/
virtual bool IsSerialized() const = 0;
- using TWaitTimeObserver = std::function<void(TDuration waitTime)>;
- //! Registers a callback that could be invoked to inform
- //! of the current wait time for invocations via this invoker.
+ //! Invoked to inform of the current wait time for invocations via this invoker.
//! These invocations, however, are not guaranteed.
- virtual void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) = 0;
+ using TWaitTimeObserver = TCallback<void(TDuration waitTime)>;
+
+ DECLARE_INTERFACE_SIGNAL(TWaitTimeObserver::TSignature, WaitTimeObserved);
};
DEFINE_REFCOUNTED_TYPE(IInvoker)
diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp
index d33514ae46..fc0fd7b308 100644
--- a/yt/yt/core/actions/invoker_detail.cpp
+++ b/yt/yt/core/actions/invoker_detail.cpp
@@ -46,9 +46,15 @@ bool TInvokerWrapper<VirtualizeBase>::IsSerialized() const
}
template <bool VirtualizeBase>
-void TInvokerWrapper<VirtualizeBase>::RegisterWaitTimeObserver(IInvoker::TWaitTimeObserver waitTimeObserver)
+void TInvokerWrapper<VirtualizeBase>::SubscribeWaitTimeObserved(const IInvoker::TWaitTimeObserver& callback)
{
- return UnderlyingInvoker_->RegisterWaitTimeObserver(waitTimeObserver);
+ return UnderlyingInvoker_->SubscribeWaitTimeObserved(callback);
+}
+
+template <bool VirtualizeBase>
+void TInvokerWrapper<VirtualizeBase>::UnsubscribeWaitTimeObserved(const IInvoker::TWaitTimeObserver& callback)
+{
+ return UnderlyingInvoker_->SubscribeWaitTimeObserved(callback);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h
index c031ac0dc7..1f662dccd4 100644
--- a/yt/yt/core/actions/invoker_detail.h
+++ b/yt/yt/core/actions/invoker_detail.h
@@ -39,7 +39,7 @@ public:
NThreading::TThreadId GetThreadId() const override;
bool CheckAffinity(const IInvokerPtr& invoker) const override;
bool IsSerialized() const override;
- void RegisterWaitTimeObserver(IInvoker::TWaitTimeObserver waitTimeObserver) override;
+ DECLARE_SIGNAL_OVERRIDE(IInvoker::TWaitTimeObserver::TSignature, WaitTimeObserved);
protected:
const IInvokerPtr UnderlyingInvoker_;
diff --git a/yt/yt/core/actions/invoker_util.cpp b/yt/yt/core/actions/invoker_util.cpp
index a50fb6a1c4..a4f47afb24 100644
--- a/yt/yt/core/actions/invoker_util.cpp
+++ b/yt/yt/core/actions/invoker_util.cpp
@@ -83,7 +83,10 @@ public:
return InvalidThreadId;
}
- void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
+ { }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
{ }
private:
@@ -128,7 +131,10 @@ public:
return InvalidThreadId;
}
- void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
+ { }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
{ }
};
diff --git a/yt/yt/core/actions/signal.h b/yt/yt/core/actions/signal.h
index b46615c415..395249ad5d 100644
--- a/yt/yt/core/actions/signal.h
+++ b/yt/yt/core/actions/signal.h
@@ -1,175 +1,12 @@
#pragma once
#include "callback.h"
-
-#include <library/cpp/yt/small_containers/compact_vector.h>
-
-#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include "callback_list.h"
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-/*!
- * A client may subscribe to a list (adding a new handler to it),
- * unsubscribe from it (removing an earlier added handler),
- * and fire it thus invoking the callbacks added so far.
- *
- * Thread affinity: any.
- */
-template <class TSignature>
-class TCallbackList
-{ };
-
-template <class TResult, class... TArgs>
-class TCallbackList<TResult(TArgs...)>
-{
-public:
- using TCallback = NYT::TCallback<TResult(TArgs...)>;
-
- //! Adds a new handler to the list.
- /*!
- * \param callback A handler to be added.
- */
- void Subscribe(const TCallback& callback);
-
- //! Removes a handler from the list.
- /*!
- * \param callback A handler to be removed.
- */
- void Unsubscribe(const TCallback& callback);
-
- //! Returns the vector of currently added callbacks.
- std::vector<TCallback> ToVector() const;
-
- //! Returns the number of handlers.
- int Size() const;
-
- //! Returns |true| if there are no handlers.
- bool Empty() const;
-
- //! Clears the list of handlers.
- void Clear();
-
- //! Runs all handlers in the list.
- //! The return values (if any) are ignored.
- template <class... TCallArgs>
- void Fire(TCallArgs&&... args) const;
-
- //! Runs all handlers in the list and clears the list.
- //! The return values (if any) are ignored.
- template <class... TCallArgs>
- void FireAndClear(TCallArgs&&... args);
-
-private:
- YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
- using TCallbackVector = TCompactVector<TCallback, 4>;
- TCallbackVector Callbacks_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-/*!
- * Similar to TCallbackList, but single-threaded and copyable.
- *
- * Cannot be used from multiple threads.
- */
-template <class TSignature>
-class TSimpleCallbackList
-{ };
-
-template <class TResult, class... TArgs>
-class TSimpleCallbackList<TResult(TArgs...)>
-{
-public:
- using TCallback = NYT::TCallback<TResult(TArgs...)>;
-
- //! Adds a new handler to the list.
- /*!
- * \param callback A handler to be added.
- */
- void Subscribe(const TCallback& callback);
-
- //! Removes a handler from the list.
- /*!
- * \param callback A handler to be removed.
- */
- void Unsubscribe(const TCallback& callback);
-
- //! Runs all handlers in the list.
- //! The return values (if any) are ignored.
- template <class... TCallArgs>
- void Fire(TCallArgs&&... args) const;
-
-private:
- using TCallbackVector = TCompactVector<TCallback, 4>;
- TCallbackVector Callbacks_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-/*!
- * Similar to TCallbackList but can only be fired once.
- * When fired, captures the arguments and in subsequent calls
- * to Subscribe instantly invokes the subscribers.
- *
- * Thread affinity: any.
- */
-template <class TSignature>
-class TSingleShotCallbackList
-{ };
-
-template <class TResult, class... TArgs>
-class TSingleShotCallbackList<TResult(TArgs...)>
-{
-public:
- using TCallback = NYT::TCallback<TResult(TArgs...)>;
-
- //! Adds a new handler to the list.
- /*!
- * If the list was already fired then #callback is invoked in situ.
- * \param callback A handler to be added.
- */
- void Subscribe(const TCallback& callback);
-
- //! Tries to add a new handler to the list.
- /*!
- * If the list was already fired then returns |false|.
- * Otherwise atomically installs the handler.
- * \param callback A handler to be added.
- */
- bool TrySubscribe(const TCallback& callback);
-
- //! Removes a handler from the list.
- /*!
- * \param callback A handler to be removed.
- */
- void Unsubscribe(const TCallback& callback);
-
- //! Returns the vector of currently added callbacks.
- std::vector<TCallback> ToVector() const;
-
- //! Runs all handlers in the list.
- //! The return values (if any) are ignored.
- /*!
- * \returns |true| if this is the first attempt to fire the list.
- */
- template <class... TCallArgs>
- bool Fire(TCallArgs&&... args);
-
- //! \returns |true| if the list was fired.
- bool IsFired() const;
-
-private:
- YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, SpinLock_);
- std::atomic<bool> Fired_ = false;
- using TCallbackVector = TCompactVector<TCallback, 4>;
- TCallbackVector Callbacks_;
- std::tuple<typename std::decay<TArgs>::type...> Args_;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
#define DEFINE_SIGNAL(TSignature, name) \
protected: \
::NYT::TCallbackList<TSignature> name##_; \
@@ -266,7 +103,3 @@ public: \
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
-
-#define SIGNAL_INL_H_
-#include "signal-inl.h"
-#undef SIGNAL_INL_H_
diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
index a5a4d9285d..fb8dec13cf 100644
--- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
@@ -71,7 +71,10 @@ public:
return false;
}
- void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
+ { }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
{ }
~TBucket();
diff --git a/yt/yt/core/concurrency/invoker_queue.cpp b/yt/yt/core/concurrency/invoker_queue.cpp
index 019589501a..eb871622f7 100644
--- a/yt/yt/core/concurrency/invoker_queue.cpp
+++ b/yt/yt/core/concurrency/invoker_queue.cpp
@@ -269,10 +269,17 @@ public:
}
}
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
{
if (auto queue = Queue_.Lock()) {
- queue->RegisterWaitTimeObserver(waitTimeObserver);
+ queue->SubscribeWaitTimeObserved(callback);
+ }
+ }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
+ {
+ if (auto queue = Queue_.Lock()) {
+ queue->UnsubscribeWaitTimeObserved(callback);
}
}
@@ -597,9 +604,7 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T
auto waitTime = CpuDurationToDuration(action->StartedAt - action->EnqueuedAt);
- if (IsWaitTimeObserverSet_.load()) {
- WaitTimeObserver_(waitTime);
- }
+ WaitTimeObserved_.Fire(waitTime);
if (Counters_[action->ProfilingTag]) {
Counters_[action->ProfilingTag]->DequeuedCounter.Increment();
@@ -676,13 +681,15 @@ IInvoker* TInvokerQueue<TQueueImpl>::GetProfilingTagSettingInvoker(int profiling
}
template <class TQueueImpl>
-void TInvokerQueue<TQueueImpl>::RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver)
+void TInvokerQueue<TQueueImpl>::SubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
{
- WaitTimeObserver_ = waitTimeObserver;
- auto alreadyInitialized = IsWaitTimeObserverSet_.exchange(true);
+ WaitTimeObserved_.Subscribe(callback);
+}
- // Multiple observers are forbidden.
- YT_VERIFY(!alreadyInitialized);
+template <class TQueueImpl>
+void TInvokerQueue<TQueueImpl>::UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
+{
+ WaitTimeObserved_.Unsubscribe(callback);
}
template <class TQueueImpl>
diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h
index e4a978aeae..95719e28fa 100644
--- a/yt/yt/core/concurrency/invoker_queue.h
+++ b/yt/yt/core/concurrency/invoker_queue.h
@@ -192,7 +192,7 @@ public:
IInvoker* GetProfilingTagSettingInvoker(int profilingTag);
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override;
+ DECLARE_SIGNAL_OVERRIDE(TWaitTimeObserver::TSignature, WaitTimeObserved);
private:
const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_;
@@ -220,8 +220,7 @@ private:
std::vector<IInvokerPtr> ProfilingTagSettingInvokers_;
- std::atomic<bool> IsWaitTimeObserverSet_;
- TWaitTimeObserver WaitTimeObserver_;
+ TCallbackList<TWaitTimeObserver::TSignature> WaitTimeObserved_;
TCountersPtr CreateCounters(const NProfiling::TTagSet& tagSet, NProfiling::IRegistryPtr registry);
diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
index 0c8334f323..fec716623d 100644
--- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp
@@ -366,7 +366,10 @@ public:
return invoker.Get() == this;
}
- void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
+ { }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
{ }
private:
@@ -788,13 +791,14 @@ public:
}
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver)
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
{
- WaitTimeObserver_ = waitTimeObserver;
- auto alreadyInitialized = IsWaitTimeObserverSet_.exchange(true);
+ WaitTimeObservers_.Subscribe(callback);
+ }
- // Multiple observers are forbidden.
- YT_VERIFY(!alreadyInitialized);
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
+ {
+ WaitTimeObservers_.Unsubscribe(callback);
}
private:
@@ -840,8 +844,7 @@ private:
std::atomic<int> ThreadCount_ = 0;
std::atomic<int> ActiveThreads_ = 0;
- std::atomic<bool> IsWaitTimeObserverSet_;
- TWaitTimeObserver WaitTimeObserver_;
+ TCallbackList<TWaitTimeObserver::TSignature> WaitTimeObservers_;
TProfiler GetPoolProfiler(const TString& poolName) override
{
@@ -1191,7 +1194,7 @@ private:
if (action.BucketHolder) {
auto waitTime = CpuDurationToDuration(action.StartedAt - action.EnqueuedAt);
action.BucketHolder->Pool->WaitTimeCounter.Record(waitTime);
- ReportWaitTime(waitTime);
+ WaitTimeObservers_.Fire(waitTime);
}
MaybeRunMaintenance(&threadState, action.StartedAt, /*flush*/ false);
@@ -1241,13 +1244,6 @@ private:
return std::move(threadState.Action.Callback);
}
- void ReportWaitTime(TDuration waitTime)
- {
- if (IsWaitTimeObserverSet_.load()) {
- WaitTimeObserver_(waitTime);
- }
- }
-
static void MaybeRunMaintenance(TThreadState* threadState, TCpuInstant now, bool flush)
{
YT_ASSERT(threadState);
@@ -1371,9 +1367,14 @@ public:
return TThreadPoolBase::GetThreadCount();
}
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
+ {
+ Queue_->SubscribeWaitTimeObserved(callback);
+ }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
{
- Queue_->RegisterWaitTimeObserver(waitTimeObserver);
+ Queue_->UnsubscribeWaitTimeObserved(callback);
}
private:
diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
index 9ca4c824d1..07897b4dc8 100644
--- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp
@@ -74,7 +74,10 @@ struct TBucket
return false;
}
- void RegisterWaitTimeObserver(TWaitTimeObserver /*waitTimeObserver*/) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
+ { }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& /*callback*/) override
{ }
~TBucket();
@@ -298,12 +301,9 @@ public:
auto currentInstant = GetCpuInstant();
TBucketPtr bucket;
- TWaitTimeObserver waitTimeObserver;
-
{
auto guard = Guard(SpinLock_);
bucket = GetStarvingBucket(action);
- waitTimeObserver = WaitTimeObserver_;
if (!bucket) {
return false;
@@ -318,8 +318,9 @@ public:
bucket->WaitTime = action->StartedAt - action->EnqueuedAt;
}
- if (waitTimeObserver) {
- waitTimeObserver(CpuDurationToDuration(action->StartedAt - action->EnqueuedAt));
+ // Microoptimization: avoid calling CpuDurationToDuration if no observers are registered.
+ if (!WaitTimeObservers_.IsEmpty()) {
+ WaitTimeObservers_.Fire(CpuDurationToDuration(action->StartedAt - action->EnqueuedAt));
}
YT_ASSERT(action && !action->Finished);
@@ -393,10 +394,14 @@ public:
}
}
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver)
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
{
- auto guard = Guard(SpinLock_);
- WaitTimeObserver_ = waitTimeObserver;
+ WaitTimeObservers_.Subscribe(callback);
+ }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback)
+ {
+ WaitTimeObservers_.Unsubscribe(callback);
}
private:
@@ -467,7 +472,7 @@ private:
std::atomic<int> ThreadCount_ = 0;
std::array<TThreadState, TThreadPoolBase::MaxThreadCount> ThreadStates_;
- ITwoLevelFairShareThreadPool::TWaitTimeObserver WaitTimeObserver_;
+ TCallbackList<ITwoLevelFairShareThreadPool::TWaitTimeObserver::TSignature> WaitTimeObservers_;
size_t GetLowestEmptyPoolId()
@@ -693,9 +698,14 @@ public:
TThreadPoolBase::Shutdown();
}
- void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) override
+ void SubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
+ {
+ Queue_->SubscribeWaitTimeObserved(callback);
+ }
+
+ void UnsubscribeWaitTimeObserved(const TWaitTimeObserver& callback) override
{
- Queue_->RegisterWaitTimeObserver(std::move(waitTimeObserver));
+ Queue_->UnsubscribeWaitTimeObserved(callback);
}
private:
diff --git a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h
index b465279a66..57d616391b 100644
--- a/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h
+++ b/yt/yt/core/concurrency/two_level_fair_share_thread_pool.h
@@ -2,7 +2,7 @@
#include "public.h"
-#include <yt/yt/core/actions/public.h>
+#include <yt/yt/core/actions/signal.h>
namespace NYT::NConcurrency {
@@ -31,8 +31,11 @@ struct ITwoLevelFairShareThreadPool
virtual void Shutdown() = 0;
- using TWaitTimeObserver = std::function<void(TDuration)>;
- virtual void RegisterWaitTimeObserver(TWaitTimeObserver waitTimeObserver) = 0;
+ //! Invoked to inform of the current wait time for invocations via this invoker.
+ //! These invocations, however, are not guaranteed.
+ using TWaitTimeObserver = TCallback<void(TDuration waitTime)>;
+
+ DECLARE_INTERFACE_SIGNAL(TWaitTimeObserver::TSignature, WaitTimeObserved);
};
DEFINE_REFCOUNTED_TYPE(ITwoLevelFairShareThreadPool)