diff options
| author | babenko <[email protected]> | 2024-11-07 23:40:41 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2024-11-07 23:51:13 +0300 |
| commit | a3a9b728cb98deccbc84a2e75d67755832de4813 (patch) | |
| tree | 1687e8f6e6790c1c66ee129cb67aec9f419ace48 | |
| parent | befc5952f9fd80b5daf6b6036ee27031c36e299f (diff) | |
Quantize large delays in Delayed Executor to reduce the number of wakeups
commit_hash:ef8d1733d0190bbb4988a71171e0a22777ff28c8
| -rw-r--r-- | yt/yt/core/concurrency/delayed_executor.cpp | 77 |
1 files changed, 57 insertions, 20 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp index d538896bc0c..169a86280ed 100644 --- a/yt/yt/core/concurrency/delayed_executor.cpp +++ b/yt/yt/core/concurrency/delayed_executor.cpp @@ -16,6 +16,10 @@ namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// static constexpr auto CoalescingInterval = TDuration::MicroSeconds(100); + +static constexpr auto LowPrecisionDelayThreshold = TDuration::MilliSeconds(10); +static constexpr auto LowPrecisionQuantumUsLog2 = 10; // ~1 ms + static constexpr auto LateWarningThreshold = TDuration::Seconds(1); static constexpr auto& Logger = ConcurrencyLogger; @@ -164,31 +168,20 @@ public: TDelayedExecutorCookie Submit(TDelayedCallback callback, TDuration delay, IInvokerPtr invoker) { - YT_VERIFY(callback); - return Submit( + return DoSubmit( std::move(callback), - delay.ToDeadLine(), + delay, + GetInstant() + delay, std::move(invoker)); } TDelayedExecutorCookie Submit(TDelayedCallback callback, TInstant deadline, IInvokerPtr invoker) { - YT_VERIFY(callback); - auto entry = New<TDelayedExecutorEntry>(std::move(callback), deadline, std::move(invoker)); - PollerThread_->EnqueueSubmission(entry); // <- (a) - - std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b) - - if (!PollerThread_->Start()) { // <- (c) - if (auto callback = TakeCallback(entry)) { - callback(/*aborted*/ true); - } -#if defined(_asan_enabled_) - NSan::MarkAsIntentionallyLeaked(entry.Get()); -#endif - } - - return entry; + return DoSubmit( + std::move(callback), + deadline - GetInstant(), + deadline, + std::move(invoker)); } void Cancel(TDelayedExecutorEntryPtr entry) @@ -370,7 +363,7 @@ private: void ProcessQueues() { - auto now = TInstant::Now(); + auto now = GetInstant(); { int submittedCallbacks = 0; @@ -446,6 +439,50 @@ private: const TPollerThreadPtr PollerThread_ = New<TPollerThread>(); + TDelayedExecutorCookie DoSubmit( + TDelayedCallback callback, + TDuration delay, + TInstant deadline, + IInvokerPtr invoker) + { + YT_VERIFY(callback); + + auto adjustedDeadline = GetAdjustedDeadline(delay, deadline); + auto entry = New<TDelayedExecutorEntry>(std::move(callback), adjustedDeadline, std::move(invoker)); + PollerThread_->EnqueueSubmission(entry); // <- (a) + + std::atomic_thread_fence(std::memory_order::seq_cst); // <- (b) + + if (!PollerThread_->Start()) { // <- (c) + if (auto callback = TakeCallback(entry)) { + callback(/*aborted*/ true); + } +#if defined(_asan_enabled_) + NSan::MarkAsIntentionallyLeaked(entry.Get()); +#endif + } + + return entry; + } + + static TInstant GetAdjustedDeadline(TDuration delay, TInstant deadline) + { + // Dont mess with small delays. + if (delay < LowPrecisionDelayThreshold) { + return deadline; + } + + // Beware of overflow; e.g. TInstant::Max() is quite common. + constexpr TInstant::TValue maskUs = (1ULL << LowPrecisionQuantumUsLog2) - 1; + constexpr TInstant::TValue deltaUs = maskUs - 1; + if (deadline.MicroSeconds() > Max<TInstant::TValue>() - deltaUs) { + return deadline; + } + + // Round up to the nearest quantum. + return TInstant::MicroSeconds((deadline.MicroSeconds() + deltaUs) & ~maskUs); + } + static void ClosureToDelayedCallbackAdapter(const TClosure& closure, bool aborted) { if (aborted) { |
