diff options
author | babenko <babenko@yandex-team.com> | 2024-11-17 23:58:09 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-11-18 00:13:36 +0300 |
commit | d5c31b6cb395dfac4f612a0ebb277cb350c88a41 (patch) | |
tree | 3ef3965cd8f25d39014261f5f695b5fa268613df | |
parent | a12a5238ca189fadb5eb6ae6a589a4697f17cb25 (diff) | |
download | ydb-d5c31b6cb395dfac4f612a0ebb277cb350c88a41.tar.gz |
Use batch invoke to reduce the number of wakeups in TDelayedExecutor
commit_hash:f82fadcde7021c9fcc92f4c9123e0d9d197a79ed
-rw-r--r-- | yt/yt/core/concurrency/delayed_executor.cpp | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp index 169a86280e..d86e8e1197 100644 --- a/yt/yt/core/concurrency/delayed_executor.cpp +++ b/yt/yt/core/concurrency/delayed_executor.cpp @@ -334,7 +334,10 @@ private: // NB: The callbacks are forwarded to the DelayedExecutor thread to prevent any user-code // from leaking to the Delayed Poller thread, which is, e.g., fiber-unfriendly. auto runAbort = [&] (const TDelayedExecutorEntryPtr& entry) { - RunCallback(entry, /*aborted*/ true); + if (auto callback = TakeCallback(entry)) { + const auto& invoker = entry->Invoker ? entry->Invoker : DelayedInvoker_; + invoker->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*aborted*/ true))); + } }; for (const auto& entry : ScheduledEntries_) { runAbort(entry); @@ -405,32 +408,33 @@ private: } ScheduledCallbacksGauge_.Update(ScheduledEntries_.size()); + THashMap<IInvokerPtr, std::vector<TClosure>> invokerToCallbacks; while (!ScheduledEntries_.empty()) { auto it = ScheduledEntries_.begin(); const auto& entry = *it; + if (entry->Deadline > now + CoalescingInterval) { break; } + if (entry->Deadline + LateWarningThreshold < now) { StaleCallbacksCounter_.Increment(); YT_LOG_DEBUG("Found a late delayed scheduled callback (Deadline: %v, Now: %v)", entry->Deadline, now); } - RunCallback(entry, false); + + if (auto callback = TakeCallback(entry)) { + auto [it, _] = invokerToCallbacks.emplace(std::move(entry->Invoker), std::vector<TClosure>()); + it->second.push_back(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*abort*/ false))); + } + entry->Iterator.reset(); ScheduledEntries_.erase(it); } - } - void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort) - { - if (auto callback = TakeCallback(entry)) { - const auto& invoker = entry->Invoker - ? entry->Invoker - : DelayedInvoker_; - invoker - ->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort))); + for (auto& [invoker, callbacks] : invokerToCallbacks) { + (invoker ? invoker : DelayedInvoker_)->Invoke(TMutableRange(callbacks)); } } }; |