diff options
author | babenko <babenko@yandex-team.com> | 2024-11-17 14:58:21 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-11-17 15:18:48 +0300 |
commit | 2890fd7e50892cfdc4b872b3c3cb2796df0f5efd (patch) | |
tree | 85fe5bd35380c1e2f15fa0adbe0209a4101a49fb | |
parent | ee390ab348b4e03d0ce004c389eaab654410b740 (diff) | |
download | ydb-2890fd7e50892cfdc4b872b3c3cb2796df0f5efd.tar.gz |
Fix batch version of TInvokerWrapper::Invoke
commit_hash:fdb8d24f36e28a431b24b50b7bfd40f6c5a7b001
-rw-r--r-- | yt/yt/core/actions/cancelable_context.cpp | 37 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker_detail.cpp | 10 | ||||
-rw-r--r-- | yt/yt/core/actions/invoker_detail.h | 1 | ||||
-rw-r--r-- | yt/yt/core/concurrency/action_queue.cpp | 26 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_share_invoker_pool.cpp | 9 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/scheduler_ut.cpp | 2 |
7 files changed, 73 insertions, 16 deletions
diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp index ba24beb035..9993b14cf4 100644 --- a/yt/yt/core/actions/cancelable_context.cpp +++ b/yt/yt/core/actions/cancelable_context.cpp @@ -25,6 +25,7 @@ public: void Invoke(TClosure callback) override { YT_ASSERT(callback); + auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); if (Context_->Canceled_) { @@ -50,9 +51,43 @@ public: })); } + void Invoke(TMutableRange<TClosure> callbacks) override + { + auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); + + std::vector<TClosure> capturedCallbacks; + capturedCallbacks.reserve(callbacks.size()); + for (auto& callback : callbacks) { + capturedCallbacks.push_back(std::move(callback)); + } + + if (Context_->Canceled_) { + capturedCallbacks.clear(); + return; + } + + return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( + [ + this, + this_ = MakeStrong(this), + capturedCallbacks = std::move(capturedCallbacks) + ] () mutable { + auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); + + if (Context_->Canceled_) { + capturedCallbacks.clear(); + return; + } + + TCurrentInvokerGuard guard(this); + for (const auto& callback : capturedCallbacks) { + callback(); + } + })); + } + private: const TCancelableContextPtr Context_; - }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/actions/invoker_detail.cpp b/yt/yt/core/actions/invoker_detail.cpp index a36a0a2b93..573165bafb 100644 --- a/yt/yt/core/actions/invoker_detail.cpp +++ b/yt/yt/core/actions/invoker_detail.cpp @@ -18,15 +18,11 @@ TInvokerWrapper<VirtualizeBase>::TInvokerWrapper(IInvokerPtr underlyingInvoker) } template <bool VirtualizeBase> -void TInvokerWrapper<VirtualizeBase>::Invoke(TClosure callback) -{ - return UnderlyingInvoker_->Invoke(std::move(callback)); -} - -template <bool VirtualizeBase> void TInvokerWrapper<VirtualizeBase>::Invoke(TMutableRange<TClosure> callbacks) { - return UnderlyingInvoker_->Invoke(callbacks); + for (auto& callback : callbacks) { + static_cast<IInvoker*>(this)->Invoke(std::move(callback)); + } } template <bool VirtualizeBase> diff --git a/yt/yt/core/actions/invoker_detail.h b/yt/yt/core/actions/invoker_detail.h index 075e992d9d..5f5538a96f 100644 --- a/yt/yt/core/actions/invoker_detail.h +++ b/yt/yt/core/actions/invoker_detail.h @@ -34,7 +34,6 @@ class TInvokerWrapper : public NDetail::TMaybeVirtualInvokerBase<VirtualizeBase> { public: - void Invoke(TClosure callback) override; void Invoke(TMutableRange<TClosure> callbacks) override; NThreading::TThreadId GetThreadId() const override; diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index 27e8ee6e0d..a597a78466 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -114,11 +114,16 @@ class TSerializedInvoker , public TInvokerProfileWrapper { public: - TSerializedInvoker(IInvokerPtr underlyingInvoker, const NProfiling::TTagSet& tagSet, NProfiling::IRegistryImplPtr registry) + TSerializedInvoker( + IInvokerPtr underlyingInvoker, + const NProfiling::TTagSet& tagSet, + NProfiling::IRegistryImplPtr registry) : TInvokerWrapper(std::move(underlyingInvoker)) , TInvokerProfileWrapper(std::move(registry), "/serialized", tagSet) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { auto wrappedCallback = WrapCallback(std::move(callback)); @@ -174,7 +179,6 @@ private: private: TIntrusivePtr<TSerializedInvoker> Owner_; bool Activated_ = false; - }; void TrySchedule(TGuard<NThreading::TSpinLock>&& guard) @@ -345,7 +349,12 @@ public: void Invoke(TClosure callback, i64 /*priority*/) override { - return UnderlyingInvoker_->Invoke(std::move(callback)); + Invoke(std::move(callback)); + } + + void Invoke(TClosure callback) override + { + UnderlyingInvoker_->Invoke(std::move(callback)); } }; @@ -378,7 +387,6 @@ public: private: const IPrioritizedInvokerPtr UnderlyingInvoker_; const i64 Priority_; - }; IInvokerPtr CreateFixedPriorityInvoker( @@ -408,6 +416,8 @@ public: , MaxConcurrentInvocations_(maxConcurrentInvocations) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { auto guard = Guard(SpinLock_); @@ -715,6 +725,8 @@ public: , Codicil_(std::move(codicil)) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( @@ -754,6 +766,8 @@ public: , Threshold_(DurationToCpuDuration(threshold)) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( @@ -763,8 +777,8 @@ public: } private: - NLogging::TLogger Logger; - TCpuDuration Threshold_; + const NLogging::TLogger Logger; + const TCpuDuration Threshold_; void RunCallback(TClosure callback) { diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index f30d4e2616..7e5644710a 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -549,6 +549,15 @@ private: } } + void Invoke(TMutableRange<TClosure> callbacks) override + { + if (auto strongParent = Parent_.Lock()) { + for (auto& callback : callbacks) { + strongParent->Enqueue(std::move(callback), Index_); + } + } + } + private: const int Index_; const TWeakPtr<TFairShareInvokerPool> Parent_; diff --git a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp index 188f08e436..498f18ecc5 100644 --- a/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp +++ b/yt/yt/core/concurrency/unittests/invoker_pool_ut.cpp @@ -57,13 +57,15 @@ public: , InvocationCount_(0) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { ++InvocationCount_; if (Bounded_ ) { EXPECT_TRUE(Parent_.Lock()); } - TInvokerWrapper::Invoke(std::move(callback)); + UnderlyingInvoker_->Invoke(std::move(callback)); } void Bound(const IMockInvokerPoolPtr& parent) diff --git a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp index 8b4c5978a9..15320fae90 100644 --- a/yt/yt/core/concurrency/unittests/scheduler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/scheduler_ut.cpp @@ -1075,6 +1075,8 @@ public: : TInvokerWrapper(std::move(underlyingInvoker)) { } + using TInvokerWrapper::Invoke; + void Invoke(TClosure callback) override { UnderlyingInvoker_->Invoke(BIND( |