diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-10-02 11:06:19 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-10-02 11:21:30 +0300 |
commit | a830a77de6483eb53f266f2161931315324cacca (patch) | |
tree | b0d7cfae4c8b34f1f5851528d3866042215b2cf1 /yt | |
parent | 2cfac96412e943de3a142da74b90da74fece6920 (diff) | |
download | ydb-a830a77de6483eb53f266f2161931315324cacca.tar.gz |
YT-12720: Capture cancelation error in some new cases
commit_hash:db414aec089ec1f05bb59c309b763c4e403bae9e
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/actions/cancelable_context.cpp | 34 | ||||
-rw-r--r-- | yt/yt/core/actions/cancelable_context.h | 3 | ||||
-rw-r--r-- | yt/yt/core/actions/cancelation_token-inl.h | 142 | ||||
-rw-r--r-- | yt/yt/core/actions/cancelation_token.cpp | 220 | ||||
-rw-r--r-- | yt/yt/core/actions/cancelation_token.h | 151 | ||||
-rw-r--r-- | yt/yt/core/actions/future-inl.h | 91 | ||||
-rw-r--r-- | yt/yt/core/actions/future.cpp | 14 | ||||
-rw-r--r-- | yt/yt/core/actions/future.h | 10 | ||||
-rw-r--r-- | yt/yt/core/actions/unittests/cancelation_token_ut.cpp | 148 | ||||
-rw-r--r-- | yt/yt/core/actions/unittests/future_ut.cpp | 105 | ||||
-rw-r--r-- | yt/yt/core/actions/unittests/ya.make | 1 | ||||
-rw-r--r-- | yt/yt/core/ya.make | 1 |
12 files changed, 878 insertions, 42 deletions
diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp index c04b6c27b0..cb3502d530 100644 --- a/yt/yt/core/actions/cancelable_context.cpp +++ b/yt/yt/core/actions/cancelable_context.cpp @@ -25,19 +25,29 @@ public: void Invoke(TClosure callback) override { YT_ASSERT(callback); + auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); if (Context_->Canceled_) { + callback.Reset(); return; } - return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE([this, this_ = MakeStrong(this), callback = std::move(callback)] { - if (Context_->Canceled_) { - return; - } - - TCurrentInvokerGuard guard(this); - callback(); - })); + return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( + [ + this, + this_ = MakeStrong(this), + callback = std::move(callback) + ] () mutable { + auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_); + + if (Context_->Canceled_) { + callback.Reset(); + return; + } + + TCurrentInvokerGuard guard(this); + callback(); + })); } private: @@ -52,6 +62,11 @@ bool TCancelableContext::IsCanceled() const return Canceled_; } +const TError& TCancelableContext::GetCancelationError() const +{ + return CancelationError_; +} + void TCancelableContext::Cancel(const TError& error) { THashSet<TWeakPtr<TCancelableContext>> propagateToContexts; @@ -70,8 +85,7 @@ void TCancelableContext::Cancel(const TError& error) Handlers_.FireAndClear(error); for (const auto& weakContext : propagateToContexts) { - auto context = weakContext.Lock(); - if (context) { + if (auto context = weakContext.Lock()) { context->Cancel(error); } } diff --git a/yt/yt/core/actions/cancelable_context.h b/yt/yt/core/actions/cancelable_context.h index 709c3d0fab..d672583ff2 100644 --- a/yt/yt/core/actions/cancelable_context.h +++ b/yt/yt/core/actions/cancelable_context.h @@ -23,6 +23,9 @@ public: //! Returns |true| iff the context is canceled. bool IsCanceled() const; + //! Only safe to use after IsCanceled returned |true|. + const TError& GetCancelationError() const; + //! Marks the context as canceled raising the handlers //! and propagates cancelation. void Cancel(const TError& error); diff --git a/yt/yt/core/actions/cancelation_token-inl.h b/yt/yt/core/actions/cancelation_token-inl.h new file mode 100644 index 0000000000..682b18be19 --- /dev/null +++ b/yt/yt/core/actions/cancelation_token-inl.h @@ -0,0 +1,142 @@ +#ifndef CANCELATION_TOKEN_INL_H_ +#error "Direct inclusion of this file is not allowed, include cancelation_token.h" +// For the sake of sane code completion. +#include "cancelation_token.h" +#endif + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +template <class TDecayedConcrete> +constexpr TDecayedConcrete& TAnyCancelationToken::TStorage::AsConcrete() & noexcept +{ + using TPtr = TDecayedConcrete*; + + if constexpr (SmallToken<TDecayedConcrete>) { + return *std::launder(reinterpret_cast<TPtr>(&Storage_)); + } else { + return *static_cast<TPtr>(*std::launder(reinterpret_cast<void**>(&Storage_))); + } +} + +template <class TDecayedConcrete> +constexpr const TDecayedConcrete& TAnyCancelationToken::TStorage::AsConcrete() const & noexcept +{ + using TPtr = const TDecayedConcrete*; + + if constexpr (SmallToken<TDecayedConcrete>) { + return *std::launder(reinterpret_cast<TPtr>(&Storage_)); + } else { + return *static_cast<TPtr>(*std::launder(reinterpret_cast<void const* const*>(&Storage_))); + } +} + +template <class TDecayedConcrete> +constexpr TDecayedConcrete&& TAnyCancelationToken::TStorage::AsConcrete() && noexcept +{ + using TPtr = TDecayedConcrete*; + + if constexpr (SmallToken<TDecayedConcrete>) { + return std::move(*std::launder(reinterpret_cast<TPtr>(&Storage_))); + } else { + return std::move(*static_cast<TPtr>(*std::launder(reinterpret_cast<void**>(&Storage_)))); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <CCancelationToken TDecayedConcrete> +TAnyCancelationToken::TVTable TAnyCancelationToken::TVTable::Create() noexcept +{ + TVTable table = {}; + table.Dtor_ = +[] (TStorage& what) { + TAlloc allocator = {}; + + auto* ptr = &what.template AsConcrete<TDecayedConcrete>(); + std::destroy_at(ptr); + + if constexpr (!SmallToken<TDecayedConcrete>) { + TTraits::deallocate(allocator, reinterpret_cast<std::byte*>(ptr), sizeof(TDecayedConcrete)); + } + }; + + table.CopyCtor_ = +[] (TStorage& where, const TStorage& what) -> void { + TAlloc allocator = {}; + + if constexpr (SmallToken<TDecayedConcrete>) { + where.Set(); + } else { + where.Set(TTraits::allocate(allocator, sizeof(TDecayedConcrete))); + } + + TTraits::template construct<TDecayedConcrete>( + allocator, + &where.template AsConcrete<TDecayedConcrete>(), + what.template AsConcrete<TDecayedConcrete>()); + }; + + table.MoveCtor_ = +[] (TStorage& where, TStorage&& what) -> void { + if constexpr (SmallToken<TDecayedConcrete>) { + TAlloc allocator = {}; + + where.Set(); + + TTraits::template construct<TDecayedConcrete>( + allocator, + &where.template AsConcrete<TDecayedConcrete>(), + std::move(what).template AsConcrete<TDecayedConcrete>()); + TTraits::template destroy<TDecayedConcrete>( + allocator, + &what.template AsConcrete<TDecayedConcrete>()); + } else { + where.Set(static_cast<void*>(&what)); + } + }; + + table.IsCancelationRequested_ = +[] (const TStorage& what) -> bool { + return what.template AsConcrete<TDecayedConcrete>().IsCancelationRequested(); + }; + + table.CancellationError_ = +[] (const TStorage& what) -> const TError& { + return what.template AsConcrete<TDecayedConcrete>().GetCancelationError(); + }; + + return table; +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TToken> + requires (!std::same_as<TAnyCancelationToken, std::remove_cvref_t<TToken>> && + CCancelationToken<std::remove_cvref_t<TToken>>) +TAnyCancelationToken::TAnyCancelationToken(TToken&& token) +{ + Set<TToken>(std::forward<TToken>(token)); +} + +template <class TToken> +void TAnyCancelationToken::Set(TToken&& token) +{ + using TDecayed = std::remove_cvref_t<TToken>; + TAlloc allocator = {}; + + Reset(); + + VTable_ = &StaticTable<TDecayed>; + + if constexpr (SmallToken<TDecayed>) { + Storage_.Set(); + } else { + Storage_.Set(TTraits::allocate(allocator, sizeof(TDecayed))); + } + + TTraits::template construct<TDecayed>( + allocator, + &Storage_.template AsConcrete<TDecayed>(), + std::forward<TToken>(token)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/yt/core/actions/cancelation_token.cpp b/yt/yt/core/actions/cancelation_token.cpp new file mode 100644 index 0000000000..c848df61ef --- /dev/null +++ b/yt/yt/core/actions/cancelation_token.cpp @@ -0,0 +1,220 @@ +#include "cancelation_token.h" + +#include "cancelable_context.h" +#include "future.h" + +#include <yt/yt/core/concurrency/fls.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +void TAnyCancelationToken::TStorage::Set(void* ptr) noexcept +{ + std::construct_at<void*>(reinterpret_cast<void**>(Storage_), ptr); +} + +void TAnyCancelationToken::TStorage::Set() noexcept +{ + std::construct_at(Storage_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TAnyCancelationToken::TVTable::TDtor TAnyCancelationToken::TVTable::Dtor() const noexcept +{ + return Dtor_; +} + +TAnyCancelationToken::TVTable::TCopyCtor TAnyCancelationToken::TVTable::CopyCtor() const noexcept +{ + return CopyCtor_; +} + +TAnyCancelationToken::TVTable::TMoveCtor TAnyCancelationToken::TVTable::MoveCtor() const noexcept +{ + return MoveCtor_; +} + +TAnyCancelationToken::TVTable::TIsCancelationRequested TAnyCancelationToken::TVTable::IsCancelationRequested() const noexcept +{ + return IsCancelationRequested_; +} + +TAnyCancelationToken::TVTable::TCancellationError TAnyCancelationToken::TVTable::GetCancelationError() const noexcept +{ + return CancellationError_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TAnyCancelationToken::TAnyCancelationToken(TAnyCancelationToken&& other) noexcept + : VTable_(other.VTable_) +{ + if (VTable_) { + auto moveCtor = VTable_->MoveCtor(); + moveCtor(Storage_, std::move(other.Storage_)); + + other.VTable_ = nullptr; + } +} + +TAnyCancelationToken& TAnyCancelationToken::operator= (TAnyCancelationToken&& other) noexcept +{ + if (this == &other) { + return *this; + } + + Reset(); + + VTable_ = other.VTable_; + + if (VTable_) { + auto moveCtor = VTable_->MoveCtor(); + moveCtor(Storage_, std::move(other.Storage_)); + + other.VTable_ = nullptr; + } + + return *this; +} + +TAnyCancelationToken::TAnyCancelationToken(const TAnyCancelationToken& other) noexcept + : VTable_(other.VTable_) +{ + if (VTable_) { + auto copyCtor = VTable_->CopyCtor(); + copyCtor(Storage_, other.Storage_); + } +} + +TAnyCancelationToken& TAnyCancelationToken::operator= (const TAnyCancelationToken& other) noexcept +{ + if (this == &other) { + return *this; + } + + Reset(); + VTable_ = other.VTable_; + + if (VTable_) { + auto copyCtor = VTable_->CopyCtor(); + copyCtor(Storage_, other.Storage_); + } + + return *this; +} + +TAnyCancelationToken::operator bool() const noexcept +{ + return VTable_ != nullptr; +} + +TAnyCancelationToken::~TAnyCancelationToken() +{ + Reset(); +} + +bool TAnyCancelationToken::IsCancelationRequested() const noexcept +{ + if (!VTable_) { + return false; + } + + return VTable_->IsCancelationRequested()(Storage_); +} + +const TError& TAnyCancelationToken::GetCancelationError() const noexcept +{ + YT_VERIFY(VTable_); + return VTable_->GetCancelationError()(Storage_); +} + +void TAnyCancelationToken::Reset() noexcept +{ + if (VTable_) { + auto dtor = VTable_->Dtor(); + dtor(Storage_); + VTable_ = nullptr; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +NConcurrency::TFlsSlot<TAnyCancelationToken> GlobalToken = {}; + +//////////////////////////////////////////////////////////////////////////////// + +TCurrentCancelationTokenGuard::TCurrentCancelationTokenGuard(TAnyCancelationToken nextToken) + : PrevToken_(std::exchange(*GlobalToken, std::move(nextToken))) +{ } + +TCurrentCancelationTokenGuard::~TCurrentCancelationTokenGuard() +{ + std::exchange(*GlobalToken, std::move(PrevToken_)); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TTokenForCancelableContext +{ + TCancelableContextPtr Context; + + bool IsCancelationRequested() const noexcept + { + return Context->IsCanceled(); + } + + const TError& GetCancelationError() const + { + return Context->GetCancelationError(); + } +}; + +static_assert(CCancelationToken<TTokenForCancelableContext>); + +//////////////////////////////////////////////////////////////////////////////// + +struct TTokenForFuture +{ + explicit TTokenForFuture(TFutureState<void>* futureState) + : FutureState(futureState) + { } + + TFutureState<void>* FutureState; + + bool IsCancelationRequested() const noexcept + { + return FutureState->IsCanceled(); + } + + const TError& GetCancelationError() const + { + return FutureState->GetCancelationError(); + } +}; + +static_assert(CCancelationToken<TTokenForFuture>); + +//////////////////////////////////////////////////////////////////////////////// + +TCurrentCancelationTokenGuard MakeFutureCurrentTokenGuard(void* opaqueFutureState) +{ + return TCurrentCancelationTokenGuard{TTokenForFuture{static_cast<NDetail::TFutureState<void>*>(opaqueFutureState)}}; +} + +TCurrentCancelationTokenGuard MakeCancelableContextCurrentTokenGuard(const TCancelableContextPtr& context) +{ + return TCurrentCancelationTokenGuard{TTokenForCancelableContext{.Context = context}}; +} + +//////////////////////////////////////////////////////////////////////////////// + +const TAnyCancelationToken& GetCurrentCancelationToken() +{ + return *GlobalToken; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/yt/core/actions/cancelation_token.h b/yt/yt/core/actions/cancelation_token.h new file mode 100644 index 0000000000..f3f28aa5a2 --- /dev/null +++ b/yt/yt/core/actions/cancelation_token.h @@ -0,0 +1,151 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/misc/error.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +// CancelToken is an entity which you can ask whether cancellation has been +// requested or not. If it has been requested, you can ask for cancelation error. +template <class T> +concept CCancelationToken = requires (T& t) { + { t.IsCancelationRequested() } -> std::same_as<bool>; + { t.GetCancelationError() } -> std::same_as<const TError&>; +} && std::copyable<T>; + +//////////////////////////////////////////////////////////////////////////////// + +// We need to read/write global variable which satisfies concept CCancelationToken. +class TAnyCancelationToken +{ +private: + static constexpr size_t SmallTokenSize = sizeof(void*) * 2; + static constexpr size_t SmallTokenAlign = SmallTokenSize; + + template <class T> + static constexpr bool SmallToken = + (sizeof(T) <= SmallTokenSize) && + (alignof(T) <= SmallTokenAlign); + + using TAlloc = std::allocator<std::byte>; + using TTraits = std::allocator_traits<TAlloc>; + + class TStorage + { + public: + TStorage() = default; + + /*constexpr in C++23*/ void Set(void* ptr) noexcept; + /*constexpr in C++23*/ void Set() noexcept; + + template <class TDecayedConcrete> + constexpr TDecayedConcrete& AsConcrete() & noexcept; + + template <class TDecayedConcrete> + constexpr const TDecayedConcrete& AsConcrete() const & noexcept; + + template <class TDecayedConcrete> + constexpr TDecayedConcrete&& AsConcrete() && noexcept; + + private: + alignas(SmallTokenAlign) std::byte Storage_[SmallTokenSize]; + }; + + class TVTable + { + private: + using TDtor = void(*)(TStorage& what); + using TCopyCtor = void (*)(TStorage& where, const TStorage& what); + using TMoveCtor = void (*)(TStorage& where, TStorage&& what); + using TIsCancelationRequested = bool (*)(const TStorage& what); + using TCancellationError = const TError& (*)(const TStorage& what); + + public: + template <CCancelationToken TDecayedConcrete> + static TVTable Create() noexcept; + + TDtor Dtor() const noexcept; + TCopyCtor CopyCtor() const noexcept; + TMoveCtor MoveCtor() const noexcept; + TIsCancelationRequested IsCancelationRequested() const noexcept; + TCancellationError GetCancelationError() const noexcept; + + private: + TDtor Dtor_ = nullptr; + TCopyCtor CopyCtor_ = nullptr; + TMoveCtor MoveCtor_ = nullptr; + TIsCancelationRequested IsCancelationRequested_ = nullptr; + TCancellationError CancellationError_ = nullptr; + + TVTable() = default; + }; + + // Consider inline vtable storage. + template <CCancelationToken TToken> + static inline TVTable StaticTable = TVTable::template Create<TToken>(); + +public: + TAnyCancelationToken() = default; + + template <class TToken> + requires (!std::same_as<TAnyCancelationToken, std::remove_cvref_t<TToken>> && + CCancelationToken<std::remove_cvref_t<TToken>>) + TAnyCancelationToken(TToken&& token); + + TAnyCancelationToken(TAnyCancelationToken&& other) noexcept; + TAnyCancelationToken& operator= (TAnyCancelationToken&& other) noexcept; + + TAnyCancelationToken(const TAnyCancelationToken& other) noexcept; + TAnyCancelationToken& operator= (const TAnyCancelationToken& other) noexcept; + + explicit operator bool() const noexcept; + + ~TAnyCancelationToken(); + + bool IsCancelationRequested() const noexcept; + const TError& GetCancelationError() const noexcept; + +private: + TStorage Storage_ = {}; + TVTable* VTable_ = nullptr; + + void Reset() noexcept; + + template <class TToken> + void Set(TToken&& token); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TCurrentCancelationTokenGuard +{ +public: + explicit TCurrentCancelationTokenGuard(TAnyCancelationToken nextToken); + ~TCurrentCancelationTokenGuard(); + + TCurrentCancelationTokenGuard(const TCurrentCancelationTokenGuard& other) = delete; + TCurrentCancelationTokenGuard& operator= (const TCurrentCancelationTokenGuard& other) = delete; + +private: + TAnyCancelationToken PrevToken_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TCurrentCancelationTokenGuard MakeFutureCurrentTokenGuard(void* opaqueFutureState); +TCurrentCancelationTokenGuard MakeCancelableContextCurrentTokenGuard(const TCancelableContextPtr& context); + +//////////////////////////////////////////////////////////////////////////////// + +const TAnyCancelationToken& GetCurrentCancelationToken(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail + +#define CANCELATION_TOKEN_INL_H_ +#include "cancelation_token-inl.h" +#undef CANCELATION_TOKEN_INL_H_ diff --git a/yt/yt/core/actions/future-inl.h b/yt/yt/core/actions/future-inl.h index 6ed80a4458..f2a1a9930f 100644 --- a/yt/yt/core/actions/future-inl.h +++ b/yt/yt/core/actions/future-inl.h @@ -6,6 +6,8 @@ #undef FUTURE_INL_H_ #include "bind.h" +#include "cancelation_token.h" +#include "invoker_util.h" #include <yt/yt/core/concurrency/delayed_executor.h> #include <yt/yt/core/concurrency/thread_affinity.h> @@ -40,21 +42,38 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// -inline NYT::TError MakeAbandonedError() +inline TError WrapIntoCancelationError(const TError& error) { - return NYT::TError(NYT::EErrorCode::Canceled, "Promise abandoned"); + return TError(EErrorCode::Canceled, "Operation canceled") + << error; } -inline NYT::TError MakeCanceledError(const NYT::TError& error) +inline TError TryExtractCancelationError() { - return NYT::TError(NYT::EErrorCode::Canceled, "Operation canceled") - << error; + const auto& currentToken = GetCurrentCancelationToken(); + + if (currentToken.IsCancelationRequested()) { + // NB(arkady-e1ppa): This clown fiesta is present because some external + // users managed to both hardcode "Promise abandoned" error message + // as the retriable one (or expected in tests) and + // rely on their cancelation error to never be wrapped + // into anything with a different error code. + const auto& tokenError = currentToken.GetCancelationError(); + return TError(tokenError.GetCode(), "Promise abandoned") << tokenError; + } + + return TError(EErrorCode::Canceled, "Promise abandoned"); } template <class T> -TFuture<T> MakeWellKnownFuture(NYT::TErrorOr<T> value) +TFuture<T> MakeWellKnownFuture(TErrorOr<T> value) { - return TFuture<T>(New<NYT::NDetail::TPromiseState<T>>(true, -1, -1, -1, std::move(value))); + return TFuture<T>(New<NDetail::TPromiseState<T>>( + /*wellKnown*/ true, + /*promiseRefCount*/ -1, + /*futureRefCount*/ -1, + /*cancelableRefCount*/ -1, + std::move(value))); } //////////////////////////////////////////////////////////////////////////////// @@ -314,6 +333,12 @@ public: return Canceled_; } + // Only safe to call after IsCanceled returned |true|. + const TError& GetCancelationError() const + { + return CancelationError_; + } + bool Wait(TDuration timeout) const; bool Wait(TInstant deadline) const; @@ -694,13 +719,18 @@ template <class T, class F> void InterceptExceptions(const TPromise<T>& promise, const F& func) { try { + auto guard = MakeFutureCurrentTokenGuard(promise.Impl_.Get()); func(); } catch (const NYT::TErrorException& ex) { promise.Set(ex.Error()); } catch (const std::exception& ex) { promise.Set(NYT::TError(ex)); } catch (const NConcurrency::TFiberCanceledException&) { - promise.Set(MakeAbandonedError()); + if (auto error = promise.GetCancelationError(); !error.IsOK()) { + promise.Set(std::move(error)); + } else { + promise.Set(TryExtractCancelationError()); + } } } @@ -1123,7 +1153,7 @@ TFuture<T> TFutureBase<T>::ToImmediatelyCancelable() const promise.OnCanceled(BIND_NO_PROPAGATE([=, cancelable = AsCancelable()] (const NYT::TError& error) { cancelable.Cancel(error); - promise.TrySet(NYT::NDetail::MakeCanceledError(error)); + promise.TrySet(NYT::NDetail::WrapIntoCancelationError(error)); })); return promise; @@ -1427,6 +1457,15 @@ bool TPromiseBase<T>::IsCanceled() const } template <class T> +NYT::TError TPromiseBase<T>::GetCancelationError() const +{ + if (!IsCanceled()) { + return NYT::TError{}; + } + return Impl_->GetCancelationError(); +} + +template <class T> bool TPromiseBase<T>::OnCanceled(TCallback<void(const NYT::TError&)> handler) const { YT_ASSERT(Impl_); @@ -1574,11 +1613,29 @@ struct TAsyncViaHelper<R(TArgs...)> TArgs... args) { auto promise = NewPromise<TUnderlying>(); - invoker->Invoke(BIND_NO_PROPAGATE( - &Inner, - std::move(this_), - promise, - WrapToPassed(std::forward<TArgs>(args))...)); + auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) mutable { + return + [ + promise, + this_ = std::move(this_), + // NB(arkady-e1ppa): TArgs = [std::tuple<...>] will cause + // copy/move ctor inference instead of a nested tuple if we don't + // write template arguments explicitly. + tuple = std::tuple<std::remove_reference_t<TArgs>...>(std::forward<TArgs>(args)...) + ] () mutable { + if constexpr (sizeof...(TArgs) == 0) { + Y_UNUSED(tuple); + } + Inner(std::move(this_), promise, std::forward<TArgs>(std::get<Indeces>(tuple))...); + }; + }; + + GuardedInvoke( + invoker, + makeOnSuccess(std::make_index_sequence<sizeof...(TArgs)>()), + [promise] { + promise.Set(TryExtractCancelationError()); + }); return promise; } @@ -1589,13 +1646,13 @@ struct TAsyncViaHelper<R(TArgs...)> TArgs... args) { auto promise = NewPromise<TUnderlying>(); - auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) { + auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) mutable { return [ promise, this_ = std::move(this_), - tuple = std::tuple(std::forward<TArgs>(args)...) - ] { + tuple = std::tuple<std::remove_reference_t<TArgs>...>(std::forward<TArgs>(args)...) + ] () mutable { if constexpr (sizeof...(TArgs) == 0) { Y_UNUSED(tuple); } diff --git a/yt/yt/core/actions/future.cpp b/yt/yt/core/actions/future.cpp index 1dc54ca780..58d11312f1 100644 --- a/yt/yt/core/actions/future.cpp +++ b/yt/yt/core/actions/future.cpp @@ -73,7 +73,7 @@ bool TFutureState<void>::Cancel(const TError& error) noexcept } if (CancelHandlers_.empty()) { - if (!TrySetError(NDetail::MakeCanceledError(error))) { + if (!TrySetError(NDetail::WrapIntoCancelationError(error))) { return false; } } else { @@ -150,7 +150,9 @@ void TFutureState<void>::InstallAbandonedError() { VERIFY_SPINLOCK_AFFINITY(SpinLock_); if (AbandonedUnset_ && !Set_) { - SetResultError(NDetail::MakeAbandonedError()); + // NB(arkady-e1ppa): Once AbandonedUnset_ is |true| we are guaranteed + // to have a cancelation error. + SetResultError(CancelationError_); Set_ = true; } } @@ -230,12 +232,14 @@ void TFutureState<void>::OnLastPromiseRefLost() } // Another fast path: no subscribers. - if ([&] { + auto cancelationError = TryExtractCancelationError(); + if ([&] () noexcept { auto guard = Guard(SpinLock_); if (ReadyEvent_ || HasHandlers_ || Canceled_) { return false; } YT_ASSERT(!AbandonedUnset_); + CancelationError_ = std::move(cancelationError); AbandonedUnset_ = true; // Cannot access this after UnrefFuture; in particular, cannot touch SpinLock_ in guard's dtor. guard.Release(); @@ -247,9 +251,9 @@ void TFutureState<void>::OnLastPromiseRefLost() } // Slow path: notify the subscribers in a dedicated thread. - GetFinalizerInvoker()->Invoke(BIND_NO_PROPAGATE([this] { + GetFinalizerInvoker()->Invoke(BIND_NO_PROPAGATE([this, error = std::move(cancelationError)] { // Set the promise if the value is still missing. - TrySetError(NDetail::MakeAbandonedError()); + TrySetError(std::move(error)); // Kill the fake weak reference. UnrefFuture(); })); diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h index a90ae22b4f..768cf930ec 100644 --- a/yt/yt/core/actions/future.h +++ b/yt/yt/core/actions/future.h @@ -71,6 +71,9 @@ constexpr bool IsFuture = false; template <class T> constexpr bool IsFuture<TFuture<T>> = true; +template <class U, class F> +void InterceptExceptions(const TPromise<U>& promise, const F& func); + } // namespace NDetail //////////////////////////////////////////////////////////////////////////////// @@ -470,6 +473,9 @@ public: //! Checks if the promise is canceled. bool IsCanceled() const; + //! Returns cancelation error if one is present. + TError GetCancelationError() const; + //! Attaches a cancellation handler. /*! * \param handler A callback to call when TFuture<T>::Cancel is triggered @@ -498,6 +504,10 @@ protected: friend void swap(TPromise<U>& lhs, TPromise<U>& rhs); template <class U> friend struct ::hash; + template <class U, class F> + friend void ::NYT::NDetail::InterceptExceptions(const TPromise<U>& promise, const F& func); + + void TrySetCanceled(const TError& error); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/actions/unittests/cancelation_token_ut.cpp b/yt/yt/core/actions/unittests/cancelation_token_ut.cpp new file mode 100644 index 0000000000..c271225ba6 --- /dev/null +++ b/yt/yt/core/actions/unittests/cancelation_token_ut.cpp @@ -0,0 +1,148 @@ +#include <gtest/gtest.h> + +#include <yt/yt/core/actions/cancelation_token.h> + +namespace NYT::NDetail { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TSimpleToken +{ + bool IsCancelationRequested() const noexcept + { + return !Error.IsOK(); + } + + const TError& GetCancelationError() const noexcept + { + return Error; + } + + TError Error; + + TSimpleToken() + { + ++CtorCount; + } + + TSimpleToken(TError error) + : Error(std::move(error)) + { } + + TSimpleToken(const TSimpleToken& other) + : Error(other.Error) + { + ++CopyCount; + } + + TSimpleToken& operator=(const TSimpleToken&) = default; + TSimpleToken& operator=(TSimpleToken&&) = default; + + TSimpleToken(TSimpleToken&& other) + : Error(std::move(other.Error)) + { + ++MoveCount; + } + + ~TSimpleToken() + { + ++DtorCount; + } + + static inline int CtorCount = 0; + static inline int DtorCount = 0; + static inline int CopyCount = 0; + static inline int MoveCount = 0; +}; + +static_assert(CCancelationToken<TSimpleToken>); + +//////////////////////////////////////////////////////////////////////////////// + +void ResetCounters() +{ + TSimpleToken::CtorCount = 0; + TSimpleToken::DtorCount = 0; + TSimpleToken::CopyCount = 0; + TSimpleToken::MoveCount = 0; +} + +TEST(TAnyTokenTest, JustWorks) +{ + ResetCounters(); + TAnyCancelationToken any{TSimpleToken{}}; + EXPECT_FALSE(any.IsCancelationRequested()); +} + +TEST(TAnyTokenTest, Copy) +{ + ResetCounters(); + TSimpleToken token{TError(EErrorCode::Canceled, "Boo")}; + + TAnyCancelationToken any{token}; + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 1); + EXPECT_EQ(TSimpleToken::MoveCount, 0); + + EXPECT_TRUE(any.IsCancelationRequested()); + + token.Error = TError{}; + + TAnyCancelationToken any1{}; + + // NB: Implicit move ctor. + any1 = token; + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 2); + EXPECT_EQ(TSimpleToken::MoveCount, 1); + EXPECT_EQ(TSimpleToken::DtorCount, 1); + EXPECT_FALSE(any1.IsCancelationRequested()); + + any1 = any; + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 3); + EXPECT_EQ(TSimpleToken::MoveCount, 1); + EXPECT_EQ(TSimpleToken::DtorCount, 2); + EXPECT_TRUE(any1.IsCancelationRequested()); +} + +TEST(TAnyTokenTest, MoveSmallToken) +{ + ResetCounters(); + TSimpleToken token{TError(EErrorCode::Canceled, "Oi")}; + + TAnyCancelationToken any{std::move(token)}; + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 0); + EXPECT_EQ(TSimpleToken::MoveCount, 1); + + EXPECT_TRUE(any.IsCancelationRequested()); + + token.Error = TError{}; + + TAnyCancelationToken any1{}; + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 0); + EXPECT_EQ(TSimpleToken::MoveCount, 1); + EXPECT_EQ(TSimpleToken::DtorCount, 0); + + any1 = std::move(token); + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 0); + EXPECT_EQ(TSimpleToken::MoveCount, 3); + EXPECT_EQ(TSimpleToken::DtorCount, 1); + EXPECT_FALSE(any1.IsCancelationRequested()); + + any1 = std::move(any); + EXPECT_EQ(TSimpleToken::CtorCount, 0); + EXPECT_EQ(TSimpleToken::CopyCount, 0); + EXPECT_EQ(TSimpleToken::MoveCount, 4); + EXPECT_EQ(TSimpleToken::DtorCount, 3); + EXPECT_TRUE(any1.IsCancelationRequested()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NDetail diff --git a/yt/yt/core/actions/unittests/future_ut.cpp b/yt/yt/core/actions/unittests/future_ut.cpp index b2d90bea48..158cb3be5b 100644 --- a/yt/yt/core/actions/unittests/future_ut.cpp +++ b/yt/yt/core/actions/unittests/future_ut.cpp @@ -4,6 +4,8 @@ #include <yt/yt/core/actions/future.h> #include <yt/yt/core/actions/invoker_util.h> +#include <yt/yt/core/concurrency/action_queue.h> + #include <yt/yt/core/misc/ref_counted_tracker.h> #include <yt/yt/core/misc/mpsc_stack.h> @@ -1410,16 +1412,6 @@ TEST_F(TFutureTest, AnyNCombinerDontPropagateCancelation) EXPECT_FALSE(p2.IsCanceled()); } -TEST_F(TFutureTest, AsyncViaCanceledInvoker) -{ - auto context = New<TCancelableContext>(); - auto invoker = context->CreateInvoker(GetSyncInvoker()); - auto generator = BIND([] {}).AsyncVia(invoker); - context->Cancel(TError("oops")); - auto future = generator(); - auto error = future.Get(); - ASSERT_EQ(NYT::EErrorCode::Canceled, error.GetCode()); -} //////////////////////////////////////////////////////////////////////////////// TEST_F(TFutureTest, LastPromiseDied) @@ -1717,6 +1709,99 @@ TEST_F(TFutureTest, CancelAppliedToUncancellable) EXPECT_TRUE(future1.Get().IsOK()); } +TEST_F(TFutureTest, AsyncViaCanceledInvoker1) +{ + auto queue = New<NConcurrency::TActionQueue>(); + auto context = New<TCancelableContext>(); + auto invoker = context->CreateInvoker(queue->GetInvoker()); + + context->Cancel(TError(EErrorCode::Canceled, "From cancelable context!")); + + auto error = WaitFor(BIND([] {}).AsyncVia(invoker).Run()); + + EXPECT_FALSE(error.IsOK()); + EXPECT_EQ(error.GetCode(), EErrorCode::Canceled); + EXPECT_TRUE(NYT::ToString(error).Contains("From cancelable context!")) + << NYT::ToString(error); +} + +TEST_F(TFutureTest, AsyncViaCanceledInvoker2) +{ + auto queue = New<NConcurrency::TActionQueue>(); + auto context = New<TCancelableContext>(); + auto invoker = context->CreateInvoker(queue->GetInvoker()); + + auto taskReady = NewPromise<void>(); + auto promise = NewPromise<void>(); + + auto future = BIND([promise, taskReady, invoker] { + taskReady.Set(); + WaitFor(promise.ToFuture(), invoker).ThrowOnError(); + }) + .AsyncVia(invoker) + .Run(); + + WaitFor(taskReady.ToFuture()).ThrowOnError(); + + context->Cancel(TError(EErrorCode::Canceled, "From cancelable context!")); + promise.Set(); + + auto error = WaitFor(future); + + EXPECT_FALSE(error.IsOK()); + EXPECT_EQ(error.GetCode(), EErrorCode::Canceled); + EXPECT_TRUE(NYT::ToString(error).Contains("From cancelable context!")) + << NYT::ToString(error); +} + +TEST_F(TFutureTest, YT_12720) +{ + auto aqueue = New<NConcurrency::TActionQueue>(); + auto invoker = aqueue->GetInvoker(); + auto leash = NewPromise<void>(); + auto taskStarted = NewPromise<void>(); + auto future = BIND([leash, taskStarted] { + taskStarted.Set(); + WaitFor(leash.ToFuture()).ThrowOnError(); + }).AsyncVia(invoker).Run(); + + WaitFor(taskStarted.ToFuture()).ThrowOnError(); + + future.Cancel(NYT::TError(EErrorCode::Canceled, "Fiber canceled in .Reset() of TFiberGuard")); + auto error = WaitFor(future); + EXPECT_FALSE(error.IsOK()); + EXPECT_EQ(error.GetCode(), EErrorCode::Canceled); + EXPECT_TRUE(NYT::ToString(error).Contains("Fiber canceled in .Reset() of TFiberGuard")) + << NYT::ToString(error); +} + +TEST_F(TFutureTest, DiscardInApply) +{ + auto aqueue = New<NConcurrency::TActionQueue>(); + auto invoker = aqueue->GetInvoker(); + + auto promise = NewPromise<void>(); + auto future = promise.ToFuture(); + + // NB(arkady-e1ppa): mutable is required to destructively move promise out of the + // closure thus forcing it to be destroyed inside the scope. + auto canceled = BIND([p = std::move(promise)] () mutable { + auto localPromise = std::move(p); + while (true) { + Yield(); + } + }).AsyncVia(invoker).Run(); + + Sleep(std::chrono::seconds(1)); + + canceled.Cancel(TError(EErrorCode::Canceled, "Canceled!")); + + auto error = WaitFor(future); + EXPECT_EQ(error.GetCode(), EErrorCode::Canceled); + EXPECT_TRUE(NYT::ToString(error).Contains("Canceled!")) + << NYT::ToString(error); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace diff --git a/yt/yt/core/actions/unittests/ya.make b/yt/yt/core/actions/unittests/ya.make index 8a91ecd927..b1cf897388 100644 --- a/yt/yt/core/actions/unittests/ya.make +++ b/yt/yt/core/actions/unittests/ya.make @@ -11,6 +11,7 @@ PROTO_NAMESPACE(yt) SRCS( actions_ut.cpp bind_ut.cpp + cancelation_token_ut.cpp future_ut.cpp invoker_ut.cpp new_with_offloaded_dtor_ut.cpp diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index c2c3ccd3ed..ceb8a9b8fc 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -12,6 +12,7 @@ ENDIF() NO_LTO() SRCS( + actions/cancelation_token.cpp actions/cancelable_context.cpp actions/current_invoker.cpp actions/future.cpp |