summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <[email protected]>2024-11-07 23:40:41 +0300
committerbabenko <[email protected]>2024-11-07 23:51:13 +0300
commita3a9b728cb98deccbc84a2e75d67755832de4813 (patch)
tree1687e8f6e6790c1c66ee129cb67aec9f419ace48
parentbefc5952f9fd80b5daf6b6036ee27031c36e299f (diff)
Quantize large delays in Delayed Executor to reduce the number of wakeups
commit_hash:ef8d1733d0190bbb4988a71171e0a22777ff28c8
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp77
1 files changed, 57 insertions, 20 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index d538896bc0c..169a86280ed 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -16,6 +16,10 @@ namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
static constexpr auto CoalescingInterval = TDuration::MicroSeconds(100);
+
+static constexpr auto LowPrecisionDelayThreshold = TDuration::MilliSeconds(10);
+static constexpr auto LowPrecisionQuantumUsLog2 = 10; // ~1 ms
+
static constexpr auto LateWarningThreshold = TDuration::Seconds(1);
static constexpr auto& Logger = ConcurrencyLogger;
@@ -164,31 +168,20 @@ public:
TDelayedExecutorCookie Submit(TDelayedCallback callback, TDuration delay, IInvokerPtr invoker)
{
- YT_VERIFY(callback);
- return Submit(
+ return DoSubmit(
std::move(callback),
- delay.ToDeadLine(),
+ delay,
+ GetInstant() + delay,
std::move(invoker));
}
TDelayedExecutorCookie Submit(TDelayedCallback callback, TInstant deadline, IInvokerPtr invoker)
{
- YT_VERIFY(callback);
- auto entry = New<TDelayedExecutorEntry>(std::move(callback), deadline, std::move(invoker));
- PollerThread_->EnqueueSubmission(entry); // <- (a)
-
- std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
-
- if (!PollerThread_->Start()) { // <- (c)
- if (auto callback = TakeCallback(entry)) {
- callback(/*aborted*/ true);
- }
-#if defined(_asan_enabled_)
- NSan::MarkAsIntentionallyLeaked(entry.Get());
-#endif
- }
-
- return entry;
+ return DoSubmit(
+ std::move(callback),
+ deadline - GetInstant(),
+ deadline,
+ std::move(invoker));
}
void Cancel(TDelayedExecutorEntryPtr entry)
@@ -370,7 +363,7 @@ private:
void ProcessQueues()
{
- auto now = TInstant::Now();
+ auto now = GetInstant();
{
int submittedCallbacks = 0;
@@ -446,6 +439,50 @@ private:
const TPollerThreadPtr PollerThread_ = New<TPollerThread>();
+ TDelayedExecutorCookie DoSubmit(
+ TDelayedCallback callback,
+ TDuration delay,
+ TInstant deadline,
+ IInvokerPtr invoker)
+ {
+ YT_VERIFY(callback);
+
+ auto adjustedDeadline = GetAdjustedDeadline(delay, deadline);
+ auto entry = New<TDelayedExecutorEntry>(std::move(callback), adjustedDeadline, std::move(invoker));
+ PollerThread_->EnqueueSubmission(entry); // <- (a)
+
+ std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b)
+
+ if (!PollerThread_->Start()) { // <- (c)
+ if (auto callback = TakeCallback(entry)) {
+ callback(/*aborted*/ true);
+ }
+#if defined(_asan_enabled_)
+ NSan::MarkAsIntentionallyLeaked(entry.Get());
+#endif
+ }
+
+ return entry;
+ }
+
+ static TInstant GetAdjustedDeadline(TDuration delay, TInstant deadline)
+ {
+ // Dont mess with small delays.
+ if (delay < LowPrecisionDelayThreshold) {
+ return deadline;
+ }
+
+ // Beware of overflow; e.g. TInstant::Max() is quite common.
+ constexpr TInstant::TValue maskUs = (1ULL << LowPrecisionQuantumUsLog2) - 1;
+ constexpr TInstant::TValue deltaUs = maskUs - 1;
+ if (deadline.MicroSeconds() > Max<TInstant::TValue>() - deltaUs) {
+ return deadline;
+ }
+
+ // Round up to the nearest quantum.
+ return TInstant::MicroSeconds((deadline.MicroSeconds() + deltaUs) & ~maskUs);
+ }
+
static void ClosureToDelayedCallbackAdapter(const TClosure& closure, bool aborted)
{
if (aborted) {