aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-10-02 11:06:19 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-10-02 11:21:30 +0300
commita830a77de6483eb53f266f2161931315324cacca (patch)
treeb0d7cfae4c8b34f1f5851528d3866042215b2cf1 /yt
parent2cfac96412e943de3a142da74b90da74fece6920 (diff)
downloadydb-a830a77de6483eb53f266f2161931315324cacca.tar.gz
YT-12720: Capture cancelation error in some new cases
commit_hash:db414aec089ec1f05bb59c309b763c4e403bae9e
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/actions/cancelable_context.cpp34
-rw-r--r--yt/yt/core/actions/cancelable_context.h3
-rw-r--r--yt/yt/core/actions/cancelation_token-inl.h142
-rw-r--r--yt/yt/core/actions/cancelation_token.cpp220
-rw-r--r--yt/yt/core/actions/cancelation_token.h151
-rw-r--r--yt/yt/core/actions/future-inl.h91
-rw-r--r--yt/yt/core/actions/future.cpp14
-rw-r--r--yt/yt/core/actions/future.h10
-rw-r--r--yt/yt/core/actions/unittests/cancelation_token_ut.cpp148
-rw-r--r--yt/yt/core/actions/unittests/future_ut.cpp105
-rw-r--r--yt/yt/core/actions/unittests/ya.make1
-rw-r--r--yt/yt/core/ya.make1
12 files changed, 878 insertions, 42 deletions
diff --git a/yt/yt/core/actions/cancelable_context.cpp b/yt/yt/core/actions/cancelable_context.cpp
index c04b6c27b0..cb3502d530 100644
--- a/yt/yt/core/actions/cancelable_context.cpp
+++ b/yt/yt/core/actions/cancelable_context.cpp
@@ -25,19 +25,29 @@ public:
void Invoke(TClosure callback) override
{
YT_ASSERT(callback);
+ auto guard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
if (Context_->Canceled_) {
+ callback.Reset();
return;
}
- return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE([this, this_ = MakeStrong(this), callback = std::move(callback)] {
- if (Context_->Canceled_) {
- return;
- }
-
- TCurrentInvokerGuard guard(this);
- callback();
- }));
+ return UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE(
+ [
+ this,
+ this_ = MakeStrong(this),
+ callback = std::move(callback)
+ ] () mutable {
+ auto currentTokenGuard = NDetail::MakeCancelableContextCurrentTokenGuard(Context_);
+
+ if (Context_->Canceled_) {
+ callback.Reset();
+ return;
+ }
+
+ TCurrentInvokerGuard guard(this);
+ callback();
+ }));
}
private:
@@ -52,6 +62,11 @@ bool TCancelableContext::IsCanceled() const
return Canceled_;
}
+const TError& TCancelableContext::GetCancelationError() const
+{
+ return CancelationError_;
+}
+
void TCancelableContext::Cancel(const TError& error)
{
THashSet<TWeakPtr<TCancelableContext>> propagateToContexts;
@@ -70,8 +85,7 @@ void TCancelableContext::Cancel(const TError& error)
Handlers_.FireAndClear(error);
for (const auto& weakContext : propagateToContexts) {
- auto context = weakContext.Lock();
- if (context) {
+ if (auto context = weakContext.Lock()) {
context->Cancel(error);
}
}
diff --git a/yt/yt/core/actions/cancelable_context.h b/yt/yt/core/actions/cancelable_context.h
index 709c3d0fab..d672583ff2 100644
--- a/yt/yt/core/actions/cancelable_context.h
+++ b/yt/yt/core/actions/cancelable_context.h
@@ -23,6 +23,9 @@ public:
//! Returns |true| iff the context is canceled.
bool IsCanceled() const;
+ //! Only safe to use after IsCanceled returned |true|.
+ const TError& GetCancelationError() const;
+
//! Marks the context as canceled raising the handlers
//! and propagates cancelation.
void Cancel(const TError& error);
diff --git a/yt/yt/core/actions/cancelation_token-inl.h b/yt/yt/core/actions/cancelation_token-inl.h
new file mode 100644
index 0000000000..682b18be19
--- /dev/null
+++ b/yt/yt/core/actions/cancelation_token-inl.h
@@ -0,0 +1,142 @@
+#ifndef CANCELATION_TOKEN_INL_H_
+#error "Direct inclusion of this file is not allowed, include cancelation_token.h"
+// For the sake of sane code completion.
+#include "cancelation_token.h"
+#endif
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TDecayedConcrete>
+constexpr TDecayedConcrete& TAnyCancelationToken::TStorage::AsConcrete() & noexcept
+{
+ using TPtr = TDecayedConcrete*;
+
+ if constexpr (SmallToken<TDecayedConcrete>) {
+ return *std::launder(reinterpret_cast<TPtr>(&Storage_));
+ } else {
+ return *static_cast<TPtr>(*std::launder(reinterpret_cast<void**>(&Storage_)));
+ }
+}
+
+template <class TDecayedConcrete>
+constexpr const TDecayedConcrete& TAnyCancelationToken::TStorage::AsConcrete() const & noexcept
+{
+ using TPtr = const TDecayedConcrete*;
+
+ if constexpr (SmallToken<TDecayedConcrete>) {
+ return *std::launder(reinterpret_cast<TPtr>(&Storage_));
+ } else {
+ return *static_cast<TPtr>(*std::launder(reinterpret_cast<void const* const*>(&Storage_)));
+ }
+}
+
+template <class TDecayedConcrete>
+constexpr TDecayedConcrete&& TAnyCancelationToken::TStorage::AsConcrete() && noexcept
+{
+ using TPtr = TDecayedConcrete*;
+
+ if constexpr (SmallToken<TDecayedConcrete>) {
+ return std::move(*std::launder(reinterpret_cast<TPtr>(&Storage_)));
+ } else {
+ return std::move(*static_cast<TPtr>(*std::launder(reinterpret_cast<void**>(&Storage_))));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <CCancelationToken TDecayedConcrete>
+TAnyCancelationToken::TVTable TAnyCancelationToken::TVTable::Create() noexcept
+{
+ TVTable table = {};
+ table.Dtor_ = +[] (TStorage& what) {
+ TAlloc allocator = {};
+
+ auto* ptr = &what.template AsConcrete<TDecayedConcrete>();
+ std::destroy_at(ptr);
+
+ if constexpr (!SmallToken<TDecayedConcrete>) {
+ TTraits::deallocate(allocator, reinterpret_cast<std::byte*>(ptr), sizeof(TDecayedConcrete));
+ }
+ };
+
+ table.CopyCtor_ = +[] (TStorage& where, const TStorage& what) -> void {
+ TAlloc allocator = {};
+
+ if constexpr (SmallToken<TDecayedConcrete>) {
+ where.Set();
+ } else {
+ where.Set(TTraits::allocate(allocator, sizeof(TDecayedConcrete)));
+ }
+
+ TTraits::template construct<TDecayedConcrete>(
+ allocator,
+ &where.template AsConcrete<TDecayedConcrete>(),
+ what.template AsConcrete<TDecayedConcrete>());
+ };
+
+ table.MoveCtor_ = +[] (TStorage& where, TStorage&& what) -> void {
+ if constexpr (SmallToken<TDecayedConcrete>) {
+ TAlloc allocator = {};
+
+ where.Set();
+
+ TTraits::template construct<TDecayedConcrete>(
+ allocator,
+ &where.template AsConcrete<TDecayedConcrete>(),
+ std::move(what).template AsConcrete<TDecayedConcrete>());
+ TTraits::template destroy<TDecayedConcrete>(
+ allocator,
+ &what.template AsConcrete<TDecayedConcrete>());
+ } else {
+ where.Set(static_cast<void*>(&what));
+ }
+ };
+
+ table.IsCancelationRequested_ = +[] (const TStorage& what) -> bool {
+ return what.template AsConcrete<TDecayedConcrete>().IsCancelationRequested();
+ };
+
+ table.CancellationError_ = +[] (const TStorage& what) -> const TError& {
+ return what.template AsConcrete<TDecayedConcrete>().GetCancelationError();
+ };
+
+ return table;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TToken>
+ requires (!std::same_as<TAnyCancelationToken, std::remove_cvref_t<TToken>> &&
+ CCancelationToken<std::remove_cvref_t<TToken>>)
+TAnyCancelationToken::TAnyCancelationToken(TToken&& token)
+{
+ Set<TToken>(std::forward<TToken>(token));
+}
+
+template <class TToken>
+void TAnyCancelationToken::Set(TToken&& token)
+{
+ using TDecayed = std::remove_cvref_t<TToken>;
+ TAlloc allocator = {};
+
+ Reset();
+
+ VTable_ = &StaticTable<TDecayed>;
+
+ if constexpr (SmallToken<TDecayed>) {
+ Storage_.Set();
+ } else {
+ Storage_.Set(TTraits::allocate(allocator, sizeof(TDecayed)));
+ }
+
+ TTraits::template construct<TDecayed>(
+ allocator,
+ &Storage_.template AsConcrete<TDecayed>(),
+ std::forward<TToken>(token));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/yt/core/actions/cancelation_token.cpp b/yt/yt/core/actions/cancelation_token.cpp
new file mode 100644
index 0000000000..c848df61ef
--- /dev/null
+++ b/yt/yt/core/actions/cancelation_token.cpp
@@ -0,0 +1,220 @@
+#include "cancelation_token.h"
+
+#include "cancelable_context.h"
+#include "future.h"
+
+#include <yt/yt/core/concurrency/fls.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TAnyCancelationToken::TStorage::Set(void* ptr) noexcept
+{
+ std::construct_at<void*>(reinterpret_cast<void**>(Storage_), ptr);
+}
+
+void TAnyCancelationToken::TStorage::Set() noexcept
+{
+ std::construct_at(Storage_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAnyCancelationToken::TVTable::TDtor TAnyCancelationToken::TVTable::Dtor() const noexcept
+{
+ return Dtor_;
+}
+
+TAnyCancelationToken::TVTable::TCopyCtor TAnyCancelationToken::TVTable::CopyCtor() const noexcept
+{
+ return CopyCtor_;
+}
+
+TAnyCancelationToken::TVTable::TMoveCtor TAnyCancelationToken::TVTable::MoveCtor() const noexcept
+{
+ return MoveCtor_;
+}
+
+TAnyCancelationToken::TVTable::TIsCancelationRequested TAnyCancelationToken::TVTable::IsCancelationRequested() const noexcept
+{
+ return IsCancelationRequested_;
+}
+
+TAnyCancelationToken::TVTable::TCancellationError TAnyCancelationToken::TVTable::GetCancelationError() const noexcept
+{
+ return CancellationError_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TAnyCancelationToken::TAnyCancelationToken(TAnyCancelationToken&& other) noexcept
+ : VTable_(other.VTable_)
+{
+ if (VTable_) {
+ auto moveCtor = VTable_->MoveCtor();
+ moveCtor(Storage_, std::move(other.Storage_));
+
+ other.VTable_ = nullptr;
+ }
+}
+
+TAnyCancelationToken& TAnyCancelationToken::operator= (TAnyCancelationToken&& other) noexcept
+{
+ if (this == &other) {
+ return *this;
+ }
+
+ Reset();
+
+ VTable_ = other.VTable_;
+
+ if (VTable_) {
+ auto moveCtor = VTable_->MoveCtor();
+ moveCtor(Storage_, std::move(other.Storage_));
+
+ other.VTable_ = nullptr;
+ }
+
+ return *this;
+}
+
+TAnyCancelationToken::TAnyCancelationToken(const TAnyCancelationToken& other) noexcept
+ : VTable_(other.VTable_)
+{
+ if (VTable_) {
+ auto copyCtor = VTable_->CopyCtor();
+ copyCtor(Storage_, other.Storage_);
+ }
+}
+
+TAnyCancelationToken& TAnyCancelationToken::operator= (const TAnyCancelationToken& other) noexcept
+{
+ if (this == &other) {
+ return *this;
+ }
+
+ Reset();
+ VTable_ = other.VTable_;
+
+ if (VTable_) {
+ auto copyCtor = VTable_->CopyCtor();
+ copyCtor(Storage_, other.Storage_);
+ }
+
+ return *this;
+}
+
+TAnyCancelationToken::operator bool() const noexcept
+{
+ return VTable_ != nullptr;
+}
+
+TAnyCancelationToken::~TAnyCancelationToken()
+{
+ Reset();
+}
+
+bool TAnyCancelationToken::IsCancelationRequested() const noexcept
+{
+ if (!VTable_) {
+ return false;
+ }
+
+ return VTable_->IsCancelationRequested()(Storage_);
+}
+
+const TError& TAnyCancelationToken::GetCancelationError() const noexcept
+{
+ YT_VERIFY(VTable_);
+ return VTable_->GetCancelationError()(Storage_);
+}
+
+void TAnyCancelationToken::Reset() noexcept
+{
+ if (VTable_) {
+ auto dtor = VTable_->Dtor();
+ dtor(Storage_);
+ VTable_ = nullptr;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+NConcurrency::TFlsSlot<TAnyCancelationToken> GlobalToken = {};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCurrentCancelationTokenGuard::TCurrentCancelationTokenGuard(TAnyCancelationToken nextToken)
+ : PrevToken_(std::exchange(*GlobalToken, std::move(nextToken)))
+{ }
+
+TCurrentCancelationTokenGuard::~TCurrentCancelationTokenGuard()
+{
+ std::exchange(*GlobalToken, std::move(PrevToken_));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TTokenForCancelableContext
+{
+ TCancelableContextPtr Context;
+
+ bool IsCancelationRequested() const noexcept
+ {
+ return Context->IsCanceled();
+ }
+
+ const TError& GetCancelationError() const
+ {
+ return Context->GetCancelationError();
+ }
+};
+
+static_assert(CCancelationToken<TTokenForCancelableContext>);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TTokenForFuture
+{
+ explicit TTokenForFuture(TFutureState<void>* futureState)
+ : FutureState(futureState)
+ { }
+
+ TFutureState<void>* FutureState;
+
+ bool IsCancelationRequested() const noexcept
+ {
+ return FutureState->IsCanceled();
+ }
+
+ const TError& GetCancelationError() const
+ {
+ return FutureState->GetCancelationError();
+ }
+};
+
+static_assert(CCancelationToken<TTokenForFuture>);
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCurrentCancelationTokenGuard MakeFutureCurrentTokenGuard(void* opaqueFutureState)
+{
+ return TCurrentCancelationTokenGuard{TTokenForFuture{static_cast<NDetail::TFutureState<void>*>(opaqueFutureState)}};
+}
+
+TCurrentCancelationTokenGuard MakeCancelableContextCurrentTokenGuard(const TCancelableContextPtr& context)
+{
+ return TCurrentCancelationTokenGuard{TTokenForCancelableContext{.Context = context}};
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TAnyCancelationToken& GetCurrentCancelationToken()
+{
+ return *GlobalToken;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/yt/core/actions/cancelation_token.h b/yt/yt/core/actions/cancelation_token.h
new file mode 100644
index 0000000000..f3f28aa5a2
--- /dev/null
+++ b/yt/yt/core/actions/cancelation_token.h
@@ -0,0 +1,151 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/misc/error.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// CancelToken is an entity which you can ask whether cancellation has been
+// requested or not. If it has been requested, you can ask for cancelation error.
+template <class T>
+concept CCancelationToken = requires (T& t) {
+ { t.IsCancelationRequested() } -> std::same_as<bool>;
+ { t.GetCancelationError() } -> std::same_as<const TError&>;
+} && std::copyable<T>;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// We need to read/write global variable which satisfies concept CCancelationToken.
+class TAnyCancelationToken
+{
+private:
+ static constexpr size_t SmallTokenSize = sizeof(void*) * 2;
+ static constexpr size_t SmallTokenAlign = SmallTokenSize;
+
+ template <class T>
+ static constexpr bool SmallToken =
+ (sizeof(T) <= SmallTokenSize) &&
+ (alignof(T) <= SmallTokenAlign);
+
+ using TAlloc = std::allocator<std::byte>;
+ using TTraits = std::allocator_traits<TAlloc>;
+
+ class TStorage
+ {
+ public:
+ TStorage() = default;
+
+ /*constexpr in C++23*/ void Set(void* ptr) noexcept;
+ /*constexpr in C++23*/ void Set() noexcept;
+
+ template <class TDecayedConcrete>
+ constexpr TDecayedConcrete& AsConcrete() & noexcept;
+
+ template <class TDecayedConcrete>
+ constexpr const TDecayedConcrete& AsConcrete() const & noexcept;
+
+ template <class TDecayedConcrete>
+ constexpr TDecayedConcrete&& AsConcrete() && noexcept;
+
+ private:
+ alignas(SmallTokenAlign) std::byte Storage_[SmallTokenSize];
+ };
+
+ class TVTable
+ {
+ private:
+ using TDtor = void(*)(TStorage& what);
+ using TCopyCtor = void (*)(TStorage& where, const TStorage& what);
+ using TMoveCtor = void (*)(TStorage& where, TStorage&& what);
+ using TIsCancelationRequested = bool (*)(const TStorage& what);
+ using TCancellationError = const TError& (*)(const TStorage& what);
+
+ public:
+ template <CCancelationToken TDecayedConcrete>
+ static TVTable Create() noexcept;
+
+ TDtor Dtor() const noexcept;
+ TCopyCtor CopyCtor() const noexcept;
+ TMoveCtor MoveCtor() const noexcept;
+ TIsCancelationRequested IsCancelationRequested() const noexcept;
+ TCancellationError GetCancelationError() const noexcept;
+
+ private:
+ TDtor Dtor_ = nullptr;
+ TCopyCtor CopyCtor_ = nullptr;
+ TMoveCtor MoveCtor_ = nullptr;
+ TIsCancelationRequested IsCancelationRequested_ = nullptr;
+ TCancellationError CancellationError_ = nullptr;
+
+ TVTable() = default;
+ };
+
+ // Consider inline vtable storage.
+ template <CCancelationToken TToken>
+ static inline TVTable StaticTable = TVTable::template Create<TToken>();
+
+public:
+ TAnyCancelationToken() = default;
+
+ template <class TToken>
+ requires (!std::same_as<TAnyCancelationToken, std::remove_cvref_t<TToken>> &&
+ CCancelationToken<std::remove_cvref_t<TToken>>)
+ TAnyCancelationToken(TToken&& token);
+
+ TAnyCancelationToken(TAnyCancelationToken&& other) noexcept;
+ TAnyCancelationToken& operator= (TAnyCancelationToken&& other) noexcept;
+
+ TAnyCancelationToken(const TAnyCancelationToken& other) noexcept;
+ TAnyCancelationToken& operator= (const TAnyCancelationToken& other) noexcept;
+
+ explicit operator bool() const noexcept;
+
+ ~TAnyCancelationToken();
+
+ bool IsCancelationRequested() const noexcept;
+ const TError& GetCancelationError() const noexcept;
+
+private:
+ TStorage Storage_ = {};
+ TVTable* VTable_ = nullptr;
+
+ void Reset() noexcept;
+
+ template <class TToken>
+ void Set(TToken&& token);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TCurrentCancelationTokenGuard
+{
+public:
+ explicit TCurrentCancelationTokenGuard(TAnyCancelationToken nextToken);
+ ~TCurrentCancelationTokenGuard();
+
+ TCurrentCancelationTokenGuard(const TCurrentCancelationTokenGuard& other) = delete;
+ TCurrentCancelationTokenGuard& operator= (const TCurrentCancelationTokenGuard& other) = delete;
+
+private:
+ TAnyCancelationToken PrevToken_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCurrentCancelationTokenGuard MakeFutureCurrentTokenGuard(void* opaqueFutureState);
+TCurrentCancelationTokenGuard MakeCancelableContextCurrentTokenGuard(const TCancelableContextPtr& context);
+
+////////////////////////////////////////////////////////////////////////////////
+
+const TAnyCancelationToken& GetCurrentCancelationToken();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
+
+#define CANCELATION_TOKEN_INL_H_
+#include "cancelation_token-inl.h"
+#undef CANCELATION_TOKEN_INL_H_
diff --git a/yt/yt/core/actions/future-inl.h b/yt/yt/core/actions/future-inl.h
index 6ed80a4458..f2a1a9930f 100644
--- a/yt/yt/core/actions/future-inl.h
+++ b/yt/yt/core/actions/future-inl.h
@@ -6,6 +6,8 @@
#undef FUTURE_INL_H_
#include "bind.h"
+#include "cancelation_token.h"
+#include "invoker_util.h"
#include <yt/yt/core/concurrency/delayed_executor.h>
#include <yt/yt/core/concurrency/thread_affinity.h>
@@ -40,21 +42,38 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
-inline NYT::TError MakeAbandonedError()
+inline TError WrapIntoCancelationError(const TError& error)
{
- return NYT::TError(NYT::EErrorCode::Canceled, "Promise abandoned");
+ return TError(EErrorCode::Canceled, "Operation canceled")
+ << error;
}
-inline NYT::TError MakeCanceledError(const NYT::TError& error)
+inline TError TryExtractCancelationError()
{
- return NYT::TError(NYT::EErrorCode::Canceled, "Operation canceled")
- << error;
+ const auto& currentToken = GetCurrentCancelationToken();
+
+ if (currentToken.IsCancelationRequested()) {
+ // NB(arkady-e1ppa): This clown fiesta is present because some external
+ // users managed to both hardcode "Promise abandoned" error message
+ // as the retriable one (or expected in tests) and
+ // rely on their cancelation error to never be wrapped
+ // into anything with a different error code.
+ const auto& tokenError = currentToken.GetCancelationError();
+ return TError(tokenError.GetCode(), "Promise abandoned") << tokenError;
+ }
+
+ return TError(EErrorCode::Canceled, "Promise abandoned");
}
template <class T>
-TFuture<T> MakeWellKnownFuture(NYT::TErrorOr<T> value)
+TFuture<T> MakeWellKnownFuture(TErrorOr<T> value)
{
- return TFuture<T>(New<NYT::NDetail::TPromiseState<T>>(true, -1, -1, -1, std::move(value)));
+ return TFuture<T>(New<NDetail::TPromiseState<T>>(
+ /*wellKnown*/ true,
+ /*promiseRefCount*/ -1,
+ /*futureRefCount*/ -1,
+ /*cancelableRefCount*/ -1,
+ std::move(value)));
}
////////////////////////////////////////////////////////////////////////////////
@@ -314,6 +333,12 @@ public:
return Canceled_;
}
+ // Only safe to call after IsCanceled returned |true|.
+ const TError& GetCancelationError() const
+ {
+ return CancelationError_;
+ }
+
bool Wait(TDuration timeout) const;
bool Wait(TInstant deadline) const;
@@ -694,13 +719,18 @@ template <class T, class F>
void InterceptExceptions(const TPromise<T>& promise, const F& func)
{
try {
+ auto guard = MakeFutureCurrentTokenGuard(promise.Impl_.Get());
func();
} catch (const NYT::TErrorException& ex) {
promise.Set(ex.Error());
} catch (const std::exception& ex) {
promise.Set(NYT::TError(ex));
} catch (const NConcurrency::TFiberCanceledException&) {
- promise.Set(MakeAbandonedError());
+ if (auto error = promise.GetCancelationError(); !error.IsOK()) {
+ promise.Set(std::move(error));
+ } else {
+ promise.Set(TryExtractCancelationError());
+ }
}
}
@@ -1123,7 +1153,7 @@ TFuture<T> TFutureBase<T>::ToImmediatelyCancelable() const
promise.OnCanceled(BIND_NO_PROPAGATE([=, cancelable = AsCancelable()] (const NYT::TError& error) {
cancelable.Cancel(error);
- promise.TrySet(NYT::NDetail::MakeCanceledError(error));
+ promise.TrySet(NYT::NDetail::WrapIntoCancelationError(error));
}));
return promise;
@@ -1427,6 +1457,15 @@ bool TPromiseBase<T>::IsCanceled() const
}
template <class T>
+NYT::TError TPromiseBase<T>::GetCancelationError() const
+{
+ if (!IsCanceled()) {
+ return NYT::TError{};
+ }
+ return Impl_->GetCancelationError();
+}
+
+template <class T>
bool TPromiseBase<T>::OnCanceled(TCallback<void(const NYT::TError&)> handler) const
{
YT_ASSERT(Impl_);
@@ -1574,11 +1613,29 @@ struct TAsyncViaHelper<R(TArgs...)>
TArgs... args)
{
auto promise = NewPromise<TUnderlying>();
- invoker->Invoke(BIND_NO_PROPAGATE(
- &Inner,
- std::move(this_),
- promise,
- WrapToPassed(std::forward<TArgs>(args))...));
+ auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) mutable {
+ return
+ [
+ promise,
+ this_ = std::move(this_),
+ // NB(arkady-e1ppa): TArgs = [std::tuple<...>] will cause
+ // copy/move ctor inference instead of a nested tuple if we don't
+ // write template arguments explicitly.
+ tuple = std::tuple<std::remove_reference_t<TArgs>...>(std::forward<TArgs>(args)...)
+ ] () mutable {
+ if constexpr (sizeof...(TArgs) == 0) {
+ Y_UNUSED(tuple);
+ }
+ Inner(std::move(this_), promise, std::forward<TArgs>(std::get<Indeces>(tuple))...);
+ };
+ };
+
+ GuardedInvoke(
+ invoker,
+ makeOnSuccess(std::make_index_sequence<sizeof...(TArgs)>()),
+ [promise] {
+ promise.Set(TryExtractCancelationError());
+ });
return promise;
}
@@ -1589,13 +1646,13 @@ struct TAsyncViaHelper<R(TArgs...)>
TArgs... args)
{
auto promise = NewPromise<TUnderlying>();
- auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) {
+ auto makeOnSuccess = [&] <size_t... Indeces> (std::index_sequence<Indeces...>) mutable {
return
[
promise,
this_ = std::move(this_),
- tuple = std::tuple(std::forward<TArgs>(args)...)
- ] {
+ tuple = std::tuple<std::remove_reference_t<TArgs>...>(std::forward<TArgs>(args)...)
+ ] () mutable {
if constexpr (sizeof...(TArgs) == 0) {
Y_UNUSED(tuple);
}
diff --git a/yt/yt/core/actions/future.cpp b/yt/yt/core/actions/future.cpp
index 1dc54ca780..58d11312f1 100644
--- a/yt/yt/core/actions/future.cpp
+++ b/yt/yt/core/actions/future.cpp
@@ -73,7 +73,7 @@ bool TFutureState<void>::Cancel(const TError& error) noexcept
}
if (CancelHandlers_.empty()) {
- if (!TrySetError(NDetail::MakeCanceledError(error))) {
+ if (!TrySetError(NDetail::WrapIntoCancelationError(error))) {
return false;
}
} else {
@@ -150,7 +150,9 @@ void TFutureState<void>::InstallAbandonedError()
{
VERIFY_SPINLOCK_AFFINITY(SpinLock_);
if (AbandonedUnset_ && !Set_) {
- SetResultError(NDetail::MakeAbandonedError());
+ // NB(arkady-e1ppa): Once AbandonedUnset_ is |true| we are guaranteed
+ // to have a cancelation error.
+ SetResultError(CancelationError_);
Set_ = true;
}
}
@@ -230,12 +232,14 @@ void TFutureState<void>::OnLastPromiseRefLost()
}
// Another fast path: no subscribers.
- if ([&] {
+ auto cancelationError = TryExtractCancelationError();
+ if ([&] () noexcept {
auto guard = Guard(SpinLock_);
if (ReadyEvent_ || HasHandlers_ || Canceled_) {
return false;
}
YT_ASSERT(!AbandonedUnset_);
+ CancelationError_ = std::move(cancelationError);
AbandonedUnset_ = true;
// Cannot access this after UnrefFuture; in particular, cannot touch SpinLock_ in guard's dtor.
guard.Release();
@@ -247,9 +251,9 @@ void TFutureState<void>::OnLastPromiseRefLost()
}
// Slow path: notify the subscribers in a dedicated thread.
- GetFinalizerInvoker()->Invoke(BIND_NO_PROPAGATE([this] {
+ GetFinalizerInvoker()->Invoke(BIND_NO_PROPAGATE([this, error = std::move(cancelationError)] {
// Set the promise if the value is still missing.
- TrySetError(NDetail::MakeAbandonedError());
+ TrySetError(std::move(error));
// Kill the fake weak reference.
UnrefFuture();
}));
diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h
index a90ae22b4f..768cf930ec 100644
--- a/yt/yt/core/actions/future.h
+++ b/yt/yt/core/actions/future.h
@@ -71,6 +71,9 @@ constexpr bool IsFuture = false;
template <class T>
constexpr bool IsFuture<TFuture<T>> = true;
+template <class U, class F>
+void InterceptExceptions(const TPromise<U>& promise, const F& func);
+
} // namespace NDetail
////////////////////////////////////////////////////////////////////////////////
@@ -470,6 +473,9 @@ public:
//! Checks if the promise is canceled.
bool IsCanceled() const;
+ //! Returns cancelation error if one is present.
+ TError GetCancelationError() const;
+
//! Attaches a cancellation handler.
/*!
* \param handler A callback to call when TFuture<T>::Cancel is triggered
@@ -498,6 +504,10 @@ protected:
friend void swap(TPromise<U>& lhs, TPromise<U>& rhs);
template <class U>
friend struct ::hash;
+ template <class U, class F>
+ friend void ::NYT::NDetail::InterceptExceptions(const TPromise<U>& promise, const F& func);
+
+ void TrySetCanceled(const TError& error);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/actions/unittests/cancelation_token_ut.cpp b/yt/yt/core/actions/unittests/cancelation_token_ut.cpp
new file mode 100644
index 0000000000..c271225ba6
--- /dev/null
+++ b/yt/yt/core/actions/unittests/cancelation_token_ut.cpp
@@ -0,0 +1,148 @@
+#include <gtest/gtest.h>
+
+#include <yt/yt/core/actions/cancelation_token.h>
+
+namespace NYT::NDetail {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSimpleToken
+{
+ bool IsCancelationRequested() const noexcept
+ {
+ return !Error.IsOK();
+ }
+
+ const TError& GetCancelationError() const noexcept
+ {
+ return Error;
+ }
+
+ TError Error;
+
+ TSimpleToken()
+ {
+ ++CtorCount;
+ }
+
+ TSimpleToken(TError error)
+ : Error(std::move(error))
+ { }
+
+ TSimpleToken(const TSimpleToken& other)
+ : Error(other.Error)
+ {
+ ++CopyCount;
+ }
+
+ TSimpleToken& operator=(const TSimpleToken&) = default;
+ TSimpleToken& operator=(TSimpleToken&&) = default;
+
+ TSimpleToken(TSimpleToken&& other)
+ : Error(std::move(other.Error))
+ {
+ ++MoveCount;
+ }
+
+ ~TSimpleToken()
+ {
+ ++DtorCount;
+ }
+
+ static inline int CtorCount = 0;
+ static inline int DtorCount = 0;
+ static inline int CopyCount = 0;
+ static inline int MoveCount = 0;
+};
+
+static_assert(CCancelationToken<TSimpleToken>);
+
+////////////////////////////////////////////////////////////////////////////////
+
+void ResetCounters()
+{
+ TSimpleToken::CtorCount = 0;
+ TSimpleToken::DtorCount = 0;
+ TSimpleToken::CopyCount = 0;
+ TSimpleToken::MoveCount = 0;
+}
+
+TEST(TAnyTokenTest, JustWorks)
+{
+ ResetCounters();
+ TAnyCancelationToken any{TSimpleToken{}};
+ EXPECT_FALSE(any.IsCancelationRequested());
+}
+
+TEST(TAnyTokenTest, Copy)
+{
+ ResetCounters();
+ TSimpleToken token{TError(EErrorCode::Canceled, "Boo")};
+
+ TAnyCancelationToken any{token};
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 1);
+ EXPECT_EQ(TSimpleToken::MoveCount, 0);
+
+ EXPECT_TRUE(any.IsCancelationRequested());
+
+ token.Error = TError{};
+
+ TAnyCancelationToken any1{};
+
+ // NB: Implicit move ctor.
+ any1 = token;
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 2);
+ EXPECT_EQ(TSimpleToken::MoveCount, 1);
+ EXPECT_EQ(TSimpleToken::DtorCount, 1);
+ EXPECT_FALSE(any1.IsCancelationRequested());
+
+ any1 = any;
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 3);
+ EXPECT_EQ(TSimpleToken::MoveCount, 1);
+ EXPECT_EQ(TSimpleToken::DtorCount, 2);
+ EXPECT_TRUE(any1.IsCancelationRequested());
+}
+
+TEST(TAnyTokenTest, MoveSmallToken)
+{
+ ResetCounters();
+ TSimpleToken token{TError(EErrorCode::Canceled, "Oi")};
+
+ TAnyCancelationToken any{std::move(token)};
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 0);
+ EXPECT_EQ(TSimpleToken::MoveCount, 1);
+
+ EXPECT_TRUE(any.IsCancelationRequested());
+
+ token.Error = TError{};
+
+ TAnyCancelationToken any1{};
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 0);
+ EXPECT_EQ(TSimpleToken::MoveCount, 1);
+ EXPECT_EQ(TSimpleToken::DtorCount, 0);
+
+ any1 = std::move(token);
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 0);
+ EXPECT_EQ(TSimpleToken::MoveCount, 3);
+ EXPECT_EQ(TSimpleToken::DtorCount, 1);
+ EXPECT_FALSE(any1.IsCancelationRequested());
+
+ any1 = std::move(any);
+ EXPECT_EQ(TSimpleToken::CtorCount, 0);
+ EXPECT_EQ(TSimpleToken::CopyCount, 0);
+ EXPECT_EQ(TSimpleToken::MoveCount, 4);
+ EXPECT_EQ(TSimpleToken::DtorCount, 3);
+ EXPECT_TRUE(any1.IsCancelationRequested());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NDetail
diff --git a/yt/yt/core/actions/unittests/future_ut.cpp b/yt/yt/core/actions/unittests/future_ut.cpp
index b2d90bea48..158cb3be5b 100644
--- a/yt/yt/core/actions/unittests/future_ut.cpp
+++ b/yt/yt/core/actions/unittests/future_ut.cpp
@@ -4,6 +4,8 @@
#include <yt/yt/core/actions/future.h>
#include <yt/yt/core/actions/invoker_util.h>
+#include <yt/yt/core/concurrency/action_queue.h>
+
#include <yt/yt/core/misc/ref_counted_tracker.h>
#include <yt/yt/core/misc/mpsc_stack.h>
@@ -1410,16 +1412,6 @@ TEST_F(TFutureTest, AnyNCombinerDontPropagateCancelation)
EXPECT_FALSE(p2.IsCanceled());
}
-TEST_F(TFutureTest, AsyncViaCanceledInvoker)
-{
- auto context = New<TCancelableContext>();
- auto invoker = context->CreateInvoker(GetSyncInvoker());
- auto generator = BIND([] {}).AsyncVia(invoker);
- context->Cancel(TError("oops"));
- auto future = generator();
- auto error = future.Get();
- ASSERT_EQ(NYT::EErrorCode::Canceled, error.GetCode());
-}
////////////////////////////////////////////////////////////////////////////////
TEST_F(TFutureTest, LastPromiseDied)
@@ -1717,6 +1709,99 @@ TEST_F(TFutureTest, CancelAppliedToUncancellable)
EXPECT_TRUE(future1.Get().IsOK());
}
+TEST_F(TFutureTest, AsyncViaCanceledInvoker1)
+{
+ auto queue = New<NConcurrency::TActionQueue>();
+ auto context = New<TCancelableContext>();
+ auto invoker = context->CreateInvoker(queue->GetInvoker());
+
+ context->Cancel(TError(EErrorCode::Canceled, "From cancelable context!"));
+
+ auto error = WaitFor(BIND([] {}).AsyncVia(invoker).Run());
+
+ EXPECT_FALSE(error.IsOK());
+ EXPECT_EQ(error.GetCode(), EErrorCode::Canceled);
+ EXPECT_TRUE(NYT::ToString(error).Contains("From cancelable context!"))
+ << NYT::ToString(error);
+}
+
+TEST_F(TFutureTest, AsyncViaCanceledInvoker2)
+{
+ auto queue = New<NConcurrency::TActionQueue>();
+ auto context = New<TCancelableContext>();
+ auto invoker = context->CreateInvoker(queue->GetInvoker());
+
+ auto taskReady = NewPromise<void>();
+ auto promise = NewPromise<void>();
+
+ auto future = BIND([promise, taskReady, invoker] {
+ taskReady.Set();
+ WaitFor(promise.ToFuture(), invoker).ThrowOnError();
+ })
+ .AsyncVia(invoker)
+ .Run();
+
+ WaitFor(taskReady.ToFuture()).ThrowOnError();
+
+ context->Cancel(TError(EErrorCode::Canceled, "From cancelable context!"));
+ promise.Set();
+
+ auto error = WaitFor(future);
+
+ EXPECT_FALSE(error.IsOK());
+ EXPECT_EQ(error.GetCode(), EErrorCode::Canceled);
+ EXPECT_TRUE(NYT::ToString(error).Contains("From cancelable context!"))
+ << NYT::ToString(error);
+}
+
+TEST_F(TFutureTest, YT_12720)
+{
+ auto aqueue = New<NConcurrency::TActionQueue>();
+ auto invoker = aqueue->GetInvoker();
+ auto leash = NewPromise<void>();
+ auto taskStarted = NewPromise<void>();
+ auto future = BIND([leash, taskStarted] {
+ taskStarted.Set();
+ WaitFor(leash.ToFuture()).ThrowOnError();
+ }).AsyncVia(invoker).Run();
+
+ WaitFor(taskStarted.ToFuture()).ThrowOnError();
+
+ future.Cancel(NYT::TError(EErrorCode::Canceled, "Fiber canceled in .Reset() of TFiberGuard"));
+ auto error = WaitFor(future);
+ EXPECT_FALSE(error.IsOK());
+ EXPECT_EQ(error.GetCode(), EErrorCode::Canceled);
+ EXPECT_TRUE(NYT::ToString(error).Contains("Fiber canceled in .Reset() of TFiberGuard"))
+ << NYT::ToString(error);
+}
+
+TEST_F(TFutureTest, DiscardInApply)
+{
+ auto aqueue = New<NConcurrency::TActionQueue>();
+ auto invoker = aqueue->GetInvoker();
+
+ auto promise = NewPromise<void>();
+ auto future = promise.ToFuture();
+
+ // NB(arkady-e1ppa): mutable is required to destructively move promise out of the
+ // closure thus forcing it to be destroyed inside the scope.
+ auto canceled = BIND([p = std::move(promise)] () mutable {
+ auto localPromise = std::move(p);
+ while (true) {
+ Yield();
+ }
+ }).AsyncVia(invoker).Run();
+
+ Sleep(std::chrono::seconds(1));
+
+ canceled.Cancel(TError(EErrorCode::Canceled, "Canceled!"));
+
+ auto error = WaitFor(future);
+ EXPECT_EQ(error.GetCode(), EErrorCode::Canceled);
+ EXPECT_TRUE(NYT::ToString(error).Contains("Canceled!"))
+ << NYT::ToString(error);
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace
diff --git a/yt/yt/core/actions/unittests/ya.make b/yt/yt/core/actions/unittests/ya.make
index 8a91ecd927..b1cf897388 100644
--- a/yt/yt/core/actions/unittests/ya.make
+++ b/yt/yt/core/actions/unittests/ya.make
@@ -11,6 +11,7 @@ PROTO_NAMESPACE(yt)
SRCS(
actions_ut.cpp
bind_ut.cpp
+ cancelation_token_ut.cpp
future_ut.cpp
invoker_ut.cpp
new_with_offloaded_dtor_ut.cpp
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index c2c3ccd3ed..ceb8a9b8fc 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -12,6 +12,7 @@ ENDIF()
NO_LTO()
SRCS(
+ actions/cancelation_token.cpp
actions/cancelable_context.cpp
actions/current_invoker.cpp
actions/future.cpp