aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-11-17 14:58:21 +0300
committerbabenko <babenko@yandex-team.com>2024-11-17 15:18:48 +0300
commit2890fd7e50892cfdc4b872b3c3cb2796df0f5efd (patch)
tree85fe5bd35380c1e2f15fa0adbe0209a4101a49fb
parentee390ab348b4e03d0ce004c389eaab654410b740 (diff)
downloadydb-2890fd7e50892cfdc4b872b3c3cb2796df0f5efd.tar.gz
Fix batch version of TInvokerWrapper::Invoke
commit_hash:fdb8d24f36e28a431b24b50b7bfd40f6c5a7b001
-rw-r--r--yt/yt/core/actions/cancelable_context.cpp37
-rw-r--r--yt/yt/core/actions/invoker_detail.cpp10
-rw-r--r--yt/yt/core/actions/invoker_detail.h1
-rw-r--r--yt/yt/core/concurrency/action_queue.cpp26
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.cpp9
-rw-r--r--yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp4
-rw-r--r--yt/yt/core/concurrency/unittests/scheduler_ut.cpp2
7 files changed, 73 insertions, 16 deletions
diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp
index ba24beb035..9993b14cf4 100644
--- a/yt/yt/core/actions/cancelable_context.cpp
+++ b/yt/yt/core/actions/cancelable_context.cpp
@@ -25,6 +25,7 @@ public:
void Invoke(TClosure callback) override
{
YT_ASSERT(callback);
+
auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
if (Context_->Canceled_) {
@@ -50,9 +51,43 @@ public:
}));
}
+ void Invoke(TMutableRange<TClosure> callbacks) override
+ {
+ auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
+
+ std::vector<TClosure> capturedCallbacks;
+ capturedCallbacks.reserve(callbacks.size());
+ for (auto& callback : callbacks) {
+ capturedCallbacks.push_back(std::move(callback));
+ }
+
+ if (Context_->Canceled_) {
+ capturedCallbacks.clear();
+ return;
+ }
+
+ return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
+ [
+ this,
+ this_ = MakeStrong(this),
+ capturedCallbacks = std::move(capturedCallbacks)
+ ] () mutable {
+ auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
+
+ if (Context_->Canceled_) {
+ capturedCallbacks.clear();
+ return;
+ }
+
+ TCurrentInvokerGuard guard(this);
+ for (const auto& callback : capturedCallbacks) {
+ callback();
+ }
+ }));
+ }
+
private:
const TCancelableContextPtr Context_;
-
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp
index a36a0a2b93..573165bafb 100644
--- a/yt/yt/core/actions/invoker_detail.cpp
+++ b/yt/yt/core/actions/invoker_detail.cpp
@@ -18,15 +18,11 @@ TInvokerWrapper<VirtualizeBase>::TInvokerWrapper(IInvokerPtr underlyingInvoker)
}
template <bool VirtualizeBase>
-void TInvokerWrapper<VirtualizeBase>::Invoke(TClosure callback)
-{
- return UnderlyingInvoker_->Invoke(std::move(callback));
-}
-
-template <bool VirtualizeBase>
void TInvokerWrapper<VirtualizeBase>::Invoke(TMutableRange<TClosure> callbacks)
{
- return UnderlyingInvoker_->Invoke(callbacks);
+ for (auto& callback : callbacks) {
+ static_cast<IInvoker*>(this)->Invoke(std::move(callback));
+ }
}
template <bool VirtualizeBase>
diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h
index 075e992d9d..5f5538a96f 100644
--- a/yt/yt/core/actions/invoker_detail.h
+++ b/yt/yt/core/actions/invoker_detail.h
@@ -34,7 +34,6 @@ class TInvokerWrapper
: public NDetail::TMaybeVirtualInvokerBase<VirtualizeBase>
{
public:
- void Invoke(TClosure callback) override;
void Invoke(TMutableRange<TClosure> callbacks) override;
NThreading::TThreadId GetThreadId() const override;
diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp
index 27e8ee6e0d..a597a78466 100644
--- a/yt/yt/core/concurrency/action_queue.cpp
+++ b/yt/yt/core/concurrency/action_queue.cpp
@@ -114,11 +114,16 @@ class TSerializedInvoker
, public TInvokerProfileWrapper
{
public:
- TSerializedInvoker(IInvokerPtr underlyingInvoker, const NProfiling::TTagSet& tagSet, NProfiling::IRegistryImplPtr registry)
+ TSerializedInvoker(
+ IInvokerPtr underlyingInvoker,
+ const NProfiling::TTagSet& tagSet,
+ NProfiling::IRegistryImplPtr registry)
: TInvokerWrapper(std::move(underlyingInvoker))
, TInvokerProfileWrapper(std::move(registry), "/serialized", tagSet)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
auto wrappedCallback = WrapCallback(std::move(callback));
@@ -174,7 +179,6 @@ private:
private:
TIntrusivePtr<TSerializedInvoker> Owner_;
bool Activated_ = false;
-
};
void TrySchedule(TGuard<NThreading::TSpinLock>&& guard)
@@ -345,7 +349,12 @@ public:
void Invoke(TClosure callback, i64 /*priority*/) override
{
- return UnderlyingInvoker_->Invoke(std::move(callback));
+ Invoke(std::move(callback));
+ }
+
+ void Invoke(TClosure callback) override
+ {
+ UnderlyingInvoker_->Invoke(std::move(callback));
}
};
@@ -378,7 +387,6 @@ public:
private:
const IPrioritizedInvokerPtr UnderlyingInvoker_;
const i64 Priority_;
-
};
IInvokerPtr CreateFixedPriorityInvoker(
@@ -408,6 +416,8 @@ public:
, MaxConcurrentInvocations_(maxConcurrentInvocations)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
auto guard = Guard(SpinLock_);
@@ -715,6 +725,8 @@ public:
, Codicil_(std::move(codicil))
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
@@ -754,6 +766,8 @@ public:
, Threshold_(DurationToCpuDuration(threshold))
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
@@ -763,8 +777,8 @@ public:
}
private:
- NLogging::TLogger Logger;
- TCpuDuration Threshold_;
+ const NLogging::TLogger Logger;
+ const TCpuDuration Threshold_;
void RunCallback(TClosure callback)
{
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
index f30d4e2616..7e5644710a 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
@@ -549,6 +549,15 @@ private:
}
}
+ void Invoke(TMutableRange<TClosure> callbacks) override
+ {
+ if (auto strongParent = Parent_.Lock()) {
+ for (auto& callback : callbacks) {
+ strongParent->Enqueue(std::move(callback), Index_);
+ }
+ }
+ }
+
private:
const int Index_;
const TWeakPtr<TFairShareInvokerPool> Parent_;
diff --git a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
index 188f08e436..498f18ecc5 100644
--- a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp
@@ -57,13 +57,15 @@ public:
, InvocationCount_(0)
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
++InvocationCount_;
if (Bounded_ ) {
EXPECT_TRUE(Parent_.Lock());
}
- TInvokerWrapper::Invoke(std::move(callback));
+ UnderlyingInvoker_->Invoke(std::move(callback));
}
void Bound(const IMockInvokerPoolPtr& parent)
diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
index 8b4c5978a9..15320fae90 100644
--- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp
@@ -1075,6 +1075,8 @@ public:
: TInvokerWrapper(std::move(underlyingInvoker))
{ }
+ using TInvokerWrapper::Invoke;
+
void Invoke(TClosure callback) override
{
UnderlyingInvoker_->Invoke(BIND(