diff options
author | danilalexeev <danilalexeev@yandex-team.com> | 2024-03-25 17:25:24 +0300 |
---|---|---|
committer | danilalexeev <danilalexeev@yandex-team.com> | 2024-03-25 17:41:52 +0300 |
commit | 00378f1a750e66e8b048858090968facf4fa3e13 (patch) | |
tree | 26f2002092922f831fc58b5eabb92cbcd75e14c5 | |
parent | dd273493de2ae585c934504307cd570284062023 (diff) | |
download | ydb-00378f1a750e66e8b048858090968facf4fa3e13.tar.gz |
YT-21380: Add options to the future deadline cancellation method
95648922cfcf8c05f39993381c96571ce4dd9617
-rw-r--r-- | yt/yt/core/actions/future-inl.h | 26 | ||||
-rw-r--r-- | yt/yt/core/actions/future.h | 24 | ||||
-rw-r--r-- | yt/yt/core/rpc/roaming_channel.cpp | 4 |
3 files changed, 44 insertions, 10 deletions
diff --git a/yt/yt/core/actions/future-inl.h b/yt/yt/core/actions/future-inl.h index 369d6f57fe..3b82586642 100644 --- a/yt/yt/core/actions/future-inl.h +++ b/yt/yt/core/actions/future-inl.h @@ -860,7 +860,11 @@ TFuture<R> ApplyUniqueHelper(TFutureBase<T> this_, TCallback<S> callback) } template <class T, class D> -TFuture<T> ApplyTimeoutHelper(TFutureBase<T> this_, D timeoutOrDeadline, IInvokerPtr invoker) +TFuture<T> ApplyTimeoutHelper( + TFutureBase<T> this_, + D timeoutOrDeadline, + TFutureTimeoutOptions options, + IInvokerPtr invoker) { auto promise = NewPromise<T>(); @@ -878,6 +882,9 @@ TFuture<T> ApplyTimeoutHelper(TFutureBase<T> this_, D timeoutOrDeadline, IInvoke error = error << NYT::TErrorAttribute("deadline", timeoutOrDeadline); } } + if (!options.Error.IsOK()) { + error = options.Error << std::move(error); + } promise.TrySet(error); cancelable.Cancel(error); }), @@ -1125,7 +1132,10 @@ TFuture<T> TFutureBase<T>::ToImmediatelyCancelable() const } template <class T> -TFuture<T> TFutureBase<T>::WithDeadline(TInstant deadline, IInvokerPtr invoker) const +TFuture<T> TFutureBase<T>::WithDeadline( + TInstant deadline, + TFutureTimeoutOptions options, + IInvokerPtr invoker) const { YT_ASSERT(Impl_); @@ -1133,11 +1143,14 @@ TFuture<T> TFutureBase<T>::WithDeadline(TInstant deadline, IInvokerPtr invoker) return TFuture<T>(Impl_); } - return NYT::NDetail::ApplyTimeoutHelper(*this, deadline, std::move(invoker)); + return NYT::NDetail::ApplyTimeoutHelper(*this, deadline, std::move(options), std::move(invoker)); } template <class T> -TFuture<T> TFutureBase<T>::WithTimeout(TDuration timeout, IInvokerPtr invoker) const +TFuture<T> TFutureBase<T>::WithTimeout( + TDuration timeout, + TFutureTimeoutOptions options, + IInvokerPtr invoker) const { YT_ASSERT(Impl_); @@ -1145,15 +1158,16 @@ TFuture<T> TFutureBase<T>::WithTimeout(TDuration timeout, IInvokerPtr invoker) c return TFuture<T>(Impl_); } - return NYT::NDetail::ApplyTimeoutHelper(*this, timeout, std::move(invoker)); + return NYT::NDetail::ApplyTimeoutHelper(*this, timeout, std::move(options), std::move(invoker)); } template <class T> TFuture<T> TFutureBase<T>::WithTimeout( std::optional<TDuration> timeout, + TFutureTimeoutOptions options, IInvokerPtr invoker) const { - return timeout ? WithTimeout(*timeout, std::move(invoker)) : TFuture<T>(Impl_); + return timeout ? WithTimeout(*timeout, std::move(options), std::move(invoker)) : TFuture<T>(Impl_); } template <class T> diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h index 6656dd47e9..4baadc2f8c 100644 --- a/yt/yt/core/actions/future.h +++ b/yt/yt/core/actions/future.h @@ -164,6 +164,15 @@ constexpr TFutureCallbackCookie NullFutureCallbackCookie = -1; //////////////////////////////////////////////////////////////////////////////// +struct TFutureTimeoutOptions +{ + //! If set to a non-trivial error, timeout or cancelation errors + //! are enveloped into this error. + TError Error; +}; + +//////////////////////////////////////////////////////////////////////////////// + //! A base class for both TFuture<T> and its specialization TFuture<void>. /*! * The resulting value can be accessed by either subscribing (#Subscribe) @@ -275,13 +284,22 @@ public: //! Returns a future that is either set to an actual value (if the original one is set in timely manner) //! or to |EErrorCode::Timeout| (in case the deadline is reached). //! The timeout event is handled in #invoker (DelayedExecutor is null). - TFuture<T> WithDeadline(TInstant deadline, IInvokerPtr invoker = nullptr) const; + TFuture<T> WithDeadline( + TInstant deadline, + TFutureTimeoutOptions options = {}, + IInvokerPtr invoker = nullptr) const; //! Returns a future that is either set to an actual value (if the original one is set in timely manner) //! or to |EErrorCode::Timeout| (in case of timeout). //! The timeout event is handled in #invoker (DelayedExecutor is null). - TFuture<T> WithTimeout(TDuration timeout, IInvokerPtr invoker = nullptr) const; - TFuture<T> WithTimeout(std::optional<TDuration> timeout, IInvokerPtr invoker = nullptr) const; + TFuture<T> WithTimeout( + TDuration timeout, + TFutureTimeoutOptions options = {}, + IInvokerPtr invoker = nullptr) const; + TFuture<T> WithTimeout( + std::optional<TDuration> timeout, + TFutureTimeoutOptions options = {}, + IInvokerPtr invoker = nullptr) const; //! Chains the asynchronous computation with another one. template <class R> diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp index 47c9767c8f..2c2bccc733 100644 --- a/yt/yt/core/rpc/roaming_channel.cpp +++ b/yt/yt/core/rpc/roaming_channel.cpp @@ -27,7 +27,9 @@ public: , StartTime_(TInstant::Now()) { if (Options_.Timeout) { - asyncChannel = asyncChannel.WithTimeout(*Options_.Timeout); + asyncChannel = asyncChannel.WithTimeout(*Options_.Timeout, TFutureTimeoutOptions{ + .Error = TError("Error getting channel") + }); } asyncChannel.Subscribe(BIND(&TRoamingRequestControl::OnGotChannel, MakeStrong(this))); |