aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordanilalexeev <danilalexeev@yandex-team.com>2024-03-25 17:25:24 +0300
committerdanilalexeev <danilalexeev@yandex-team.com>2024-03-25 17:41:52 +0300
commit00378f1a750e66e8b048858090968facf4fa3e13 (patch)
tree26f2002092922f831fc58b5eabb92cbcd75e14c5
parentdd273493de2ae585c934504307cd570284062023 (diff)
downloadydb-00378f1a750e66e8b048858090968facf4fa3e13.tar.gz
YT-21380: Add options to the future deadline cancellation method
95648922cfcf8c05f39993381c96571ce4dd9617
-rw-r--r--yt/yt/core/actions/future-inl.h26
-rw-r--r--yt/yt/core/actions/future.h24
-rw-r--r--yt/yt/core/rpc/roaming_channel.cpp4
3 files changed, 44 insertions, 10 deletions
diff --git a/yt/yt/core/actions/future-inl.h b/yt/yt/core/actions/future-inl.h
index 369d6f57fe..3b82586642 100644
--- a/yt/yt/core/actions/future-inl.h
+++ b/yt/yt/core/actions/future-inl.h
@@ -860,7 +860,11 @@ TFuture<R> ApplyUniqueHelper(TFutureBase<T> this_, TCallback<S> callback)
}
template <class T, class D>
-TFuture<T> ApplyTimeoutHelper(TFutureBase<T> this_, D timeoutOrDeadline, IInvokerPtr invoker)
+TFuture<T> ApplyTimeoutHelper(
+ TFutureBase<T> this_,
+ D timeoutOrDeadline,
+ TFutureTimeoutOptions options,
+ IInvokerPtr invoker)
{
auto promise = NewPromise<T>();
@@ -878,6 +882,9 @@ TFuture<T> ApplyTimeoutHelper(TFutureBase<T> this_, D timeoutOrDeadline, IInvoke
error = error << NYT::TErrorAttribute("deadline", timeoutOrDeadline);
}
}
+ if (!options.Error.IsOK()) {
+ error = options.Error << std::move(error);
+ }
promise.TrySet(error);
cancelable.Cancel(error);
}),
@@ -1125,7 +1132,10 @@ TFuture<T> TFutureBase<T>::ToImmediatelyCancelable() const
}
template <class T>
-TFuture<T> TFutureBase<T>::WithDeadline(TInstant deadline, IInvokerPtr invoker) const
+TFuture<T> TFutureBase<T>::WithDeadline(
+ TInstant deadline,
+ TFutureTimeoutOptions options,
+ IInvokerPtr invoker) const
{
YT_ASSERT(Impl_);
@@ -1133,11 +1143,14 @@ TFuture<T> TFutureBase<T>::WithDeadline(TInstant deadline, IInvokerPtr invoker)
return TFuture<T>(Impl_);
}
- return NYT::NDetail::ApplyTimeoutHelper(*this, deadline, std::move(invoker));
+ return NYT::NDetail::ApplyTimeoutHelper(*this, deadline, std::move(options), std::move(invoker));
}
template <class T>
-TFuture<T> TFutureBase<T>::WithTimeout(TDuration timeout, IInvokerPtr invoker) const
+TFuture<T> TFutureBase<T>::WithTimeout(
+ TDuration timeout,
+ TFutureTimeoutOptions options,
+ IInvokerPtr invoker) const
{
YT_ASSERT(Impl_);
@@ -1145,15 +1158,16 @@ TFuture<T> TFutureBase<T>::WithTimeout(TDuration timeout, IInvokerPtr invoker) c
return TFuture<T>(Impl_);
}
- return NYT::NDetail::ApplyTimeoutHelper(*this, timeout, std::move(invoker));
+ return NYT::NDetail::ApplyTimeoutHelper(*this, timeout, std::move(options), std::move(invoker));
}
template <class T>
TFuture<T> TFutureBase<T>::WithTimeout(
std::optional<TDuration> timeout,
+ TFutureTimeoutOptions options,
IInvokerPtr invoker) const
{
- return timeout ? WithTimeout(*timeout, std::move(invoker)) : TFuture<T>(Impl_);
+ return timeout ? WithTimeout(*timeout, std::move(options), std::move(invoker)) : TFuture<T>(Impl_);
}
template <class T>
diff --git a/yt/yt/core/actions/future.h b/yt/yt/core/actions/future.h
index 6656dd47e9..4baadc2f8c 100644
--- a/yt/yt/core/actions/future.h
+++ b/yt/yt/core/actions/future.h
@@ -164,6 +164,15 @@ constexpr TFutureCallbackCookie NullFutureCallbackCookie = -1;
////////////////////////////////////////////////////////////////////////////////
+struct TFutureTimeoutOptions
+{
+ //! If set to a non-trivial error, timeout or cancelation errors
+ //! are enveloped into this error.
+ TError Error;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
//! A base class for both TFuture<T> and its specialization TFuture<void>.
/*!
* The resulting value can be accessed by either subscribing (#Subscribe)
@@ -275,13 +284,22 @@ public:
//! Returns a future that is either set to an actual value (if the original one is set in timely manner)
//! or to |EErrorCode::Timeout| (in case the deadline is reached).
//! The timeout event is handled in #invoker (DelayedExecutor is null).
- TFuture<T> WithDeadline(TInstant deadline, IInvokerPtr invoker = nullptr) const;
+ TFuture<T> WithDeadline(
+ TInstant deadline,
+ TFutureTimeoutOptions options = {},
+ IInvokerPtr invoker = nullptr) const;
//! Returns a future that is either set to an actual value (if the original one is set in timely manner)
//! or to |EErrorCode::Timeout| (in case of timeout).
//! The timeout event is handled in #invoker (DelayedExecutor is null).
- TFuture<T> WithTimeout(TDuration timeout, IInvokerPtr invoker = nullptr) const;
- TFuture<T> WithTimeout(std::optional<TDuration> timeout, IInvokerPtr invoker = nullptr) const;
+ TFuture<T> WithTimeout(
+ TDuration timeout,
+ TFutureTimeoutOptions options = {},
+ IInvokerPtr invoker = nullptr) const;
+ TFuture<T> WithTimeout(
+ std::optional<TDuration> timeout,
+ TFutureTimeoutOptions options = {},
+ IInvokerPtr invoker = nullptr) const;
//! Chains the asynchronous computation with another one.
template <class R>
diff --git a/yt/yt/core/rpc/roaming_channel.cpp b/yt/yt/core/rpc/roaming_channel.cpp
index 47c9767c8f..2c2bccc733 100644
--- a/yt/yt/core/rpc/roaming_channel.cpp
+++ b/yt/yt/core/rpc/roaming_channel.cpp
@@ -27,7 +27,9 @@ public:
, StartTime_(TInstant::Now())
{
if (Options_.Timeout) {
- asyncChannel = asyncChannel.WithTimeout(*Options_.Timeout);
+ asyncChannel = asyncChannel.WithTimeout(*Options_.Timeout, TFutureTimeoutOptions{
+ .Error = TError("Error getting channel")
+ });
}
asyncChannel.Subscribe(BIND(&TRoamingRequestControl::OnGotChannel, MakeStrong(this)));