aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-11-17 23:58:09 +0300
committerbabenko <babenko@yandex-team.com>2024-11-18 00:13:36 +0300
commitd5c31b6cb395dfac4f612a0ebb277cb350c88a41 (patch)
tree3ef3965cd8f25d39014261f5f695b5fa268613df
parenta12a5238ca189fadb5eb6ae6a589a4697f17cb25 (diff)
downloadydb-d5c31b6cb395dfac4f612a0ebb277cb350c88a41.tar.gz
Use batch invoke to reduce the number of wakeups in TDelayedExecutor
commit_hash:f82fadcde7021c9fcc92f4c9123e0d9d197a79ed
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp26
1 files changed, 15 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index 169a86280e..d86e8e1197 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -334,7 +334,10 @@ private:
// NB: The callbacks are forwarded to the DelayedExecutor thread to prevent any user-code
// from leaking to the Delayed Poller thread, which is, e.g., fiber-unfriendly.
auto runAbort = [&] (const TDelayedExecutorEntryPtr& entry) {
- RunCallback(entry, /*aborted*/ true);
+ if (auto callback = TakeCallback(entry)) {
+ const auto& invoker = entry->Invoker ? entry->Invoker : DelayedInvoker_;
+ invoker->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*aborted*/ true)));
+ }
};
for (const auto& entry : ScheduledEntries_) {
runAbort(entry);
@@ -405,32 +408,33 @@ private:
}
ScheduledCallbacksGauge_.Update(ScheduledEntries_.size());
+ THashMap<IInvokerPtr, std::vector<TClosure>> invokerToCallbacks;
while (!ScheduledEntries_.empty()) {
auto it = ScheduledEntries_.begin();
const auto& entry = *it;
+
if (entry->Deadline > now + CoalescingInterval) {
break;
}
+
if (entry->Deadline + LateWarningThreshold < now) {
StaleCallbacksCounter_.Increment();
YT_LOG_DEBUG("Found a late delayed scheduled callback (Deadline: %v, Now: %v)",
entry->Deadline,
now);
}
- RunCallback(entry, false);
+
+ if (auto callback = TakeCallback(entry)) {
+ auto [it, _] = invokerToCallbacks.emplace(std::move(entry->Invoker), std::vector<TClosure>());
+ it->second.push_back(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), /*abort*/ false)));
+ }
+
entry->Iterator.reset();
ScheduledEntries_.erase(it);
}
- }
- void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort)
- {
- if (auto callback = TakeCallback(entry)) {
- const auto& invoker = entry->Invoker
- ? entry->Invoker
- : DelayedInvoker_;
- invoker
- ->Invoke(BIND_NO_PROPAGATE(TCallbackGuard(std::move(callback), abort)));
+ for (auto& [invoker, callbacks] : invokerToCallbacks) {
+ (invoker ? invoker : DelayedInvoker_)->Invoke(TMutableRange(callbacks));
}
}
};