aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-11-07 14:08:05 +0300
committerbabenko <babenko@yandex-team.com>2024-11-07 14:49:01 +0300
commitd68fb6554629a8c1a0a61b5d23a6a99822d361e3 (patch)
tree98c2c8f1fd9935c18967f73109933e7698c6da1e /yt
parent0b8554f88de14184989f9279afcb7d1dc859d19d (diff)
downloadydb-d68fb6554629a8c1a0a61b5d23a6a99822d361e3.tar.gz
Fix batch Invoke for suspendable invoker
commit_hash:87fc98c02115686ae005e61f7a0c0076ed811ba7
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/actions/invoker_detail.h5
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp6
-rw-r--r--yt/yt/core/concurrency/invoker_queue.h1
3 files changed, 8 insertions, 4 deletions
diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h
index 6556605779..075e992d9d 100644
--- a/yt/yt/core/actions/invoker_detail.h
+++ b/yt/yt/core/actions/invoker_detail.h
@@ -35,7 +35,6 @@ class TInvokerWrapper
{
public:
void Invoke(TClosure callback) override;
-
void Invoke(TMutableRange<TClosure> callbacks) override;
NThreading::TThreadId GetThreadId() const override;
@@ -44,9 +43,9 @@ public:
void RegisterWaitTimeObserver(IInvoker::TWaitTimeObserver waitTimeObserver) override;
protected:
- explicit TInvokerWrapper(IInvokerPtr underlyingInvoker);
+ const IInvokerPtr UnderlyingInvoker_;
- IInvokerPtr UnderlyingInvoker_;
+ explicit TInvokerWrapper(IInvokerPtr underlyingInvoker);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index 819d8043bf..27e8ee6e0d 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -573,6 +573,12 @@ public:
ScheduleMore();
}
+ void Invoke(TMutableRange<TClosure> callbacks) override
+ {
+ Queue_.EnqueueAll(std::move(callbacks));
+ ScheduleMore();
+ }
+
TFuture<void> Suspend() override
{
YT_VERIFY(!Suspended_.exchange(true));
diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h
index 0dea9c30b8..d32a5c9cff 100644
--- a/yt/yt/core/concurrency/invoker_queue.h
+++ b/yt/yt/core/concurrency/invoker_queue.h
@@ -139,7 +139,6 @@ public:
void SetThreadId(NThreading::TThreadId threadId);
void Invoke(TClosure callback) override;
-
void Invoke(TMutableRange<TClosure> callbacks) override;
void Invoke(