diff options
author | danilalexeev <danilalexeev@yandex-team.com> | 2025-01-10 21:38:02 +0300 |
---|---|---|
committer | danilalexeev <danilalexeev@yandex-team.com> | 2025-01-10 22:14:42 +0300 |
commit | f07c2e564836ce426122ad9969111dc4dfb09cc7 (patch) | |
tree | dfbe2d179bea48ab9a7a3fc897f703b7d67df55a | |
parent | c8a9d5ee43af5a4d6091b81c37726a752b5d94ea (diff) | |
download | ydb-f07c2e564836ce426122ad9969111dc4dfb09cc7.tar.gz |
YT-23734: Exponential backoffs in retrying channel
Introduce an exponential retry backoff policy in TRetryingChannel instead of a constant one. Modify testing config files with an idea to keep the same total retry time.
commit_hash:7c43b268d22797eafc0722d3ae8f1bd32d03a599
-rw-r--r-- | yt/yt/core/misc/config.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/rpc/config.cpp | 4 | ||||
-rw-r--r-- | yt/yt/core/rpc/config.h | 8 | ||||
-rw-r--r-- | yt/yt/core/rpc/retrying_channel.cpp | 48 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_ut.cpp | 4 |
5 files changed, 59 insertions, 11 deletions
diff --git a/yt/yt/core/misc/config.cpp b/yt/yt/core/misc/config.cpp index bce3842cc7..2e26b8f5be 100644 --- a/yt/yt/core/misc/config.cpp +++ b/yt/yt/core/misc/config.cpp @@ -145,6 +145,12 @@ void TExponentialBackoffOptionsSerializer::Register(TRegistrar registrar) registrar.ExternalClassParameter("backoff_jitter", &TThat::BackoffJitter) .Default(TThat::DefaultBackoffJitter); + + registrar.ExternalPostprocessor([] (TThat* config) { + if(config->MinBackoff > config->MaxBackoff) { + THROW_ERROR_EXCEPTION("\"min_backoff\" must be less or equal than \"max_backoff\""); + } + }); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 5e99e1baf3..2ddb306d6b 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -141,6 +141,10 @@ void TRetryingChannelConfig::Register(TRegistrar registrar) registrar.Parameter("retry_attempts", &TThis::RetryAttempts) .GreaterThanOrEqual(1) .Default(10); + registrar.Parameter("enable_exponential_retry_backoffs", &TThis::EnableExponentialRetryBackoffs) + .Default(false); + registrar.Parameter("retry_backoff", &TThis::RetryBackoff) + .Default(); registrar.Parameter("retry_timeout", &TThis::RetryTimeout) .GreaterThanOrEqual(TDuration::Zero()) .Default(); diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index 92f9de08c6..2d7371bd91 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -8,6 +8,8 @@ #include <yt/yt/core/concurrency/config.h> +#include <yt/yt/core/misc/backoff_strategy.h> + #include <library/cpp/yt/misc/enum.h> #include <vector> @@ -182,6 +184,12 @@ public: //! Maximum number of retry attempts to make. int RetryAttempts; + // COMPAT(danilalexeev): YT-23734. + bool EnableExponentialRetryBackoffs; + + //! Retry backoff policy. + TExponentialBackoffOptions RetryBackoff; + //! Maximum time to spend while retrying. //! If null then no limit is enforced. std::optional<TDuration> RetryTimeout; diff --git a/yt/yt/core/rpc/retrying_channel.cpp b/yt/yt/core/rpc/retrying_channel.cpp index 2de3963d44..85a56da1c6 100644 --- a/yt/yt/core/rpc/retrying_channel.cpp +++ b/yt/yt/core/rpc/retrying_channel.cpp @@ -85,6 +85,7 @@ private: , ResponseHandler_(std::move(responseHandler)) , Options_(options) , RetryChecker_(std::move(retryChecker)) + , BackoffStrategy_(Config_->RetryBackoff) { YT_ASSERT(Config_); YT_ASSERT(UnderlyingChannel_); @@ -190,6 +191,7 @@ private: const TCallback<bool(const TError&)> RetryChecker_; const TRetryingRequestControlThunkPtr RequestControlThunk_ = New<TRetryingRequestControlThunk>(); + TBackoffStrategy BackoffStrategy_; //! The current attempt number (1-based). int CurrentAttempt_ = 1; TInstant Deadline_; @@ -210,10 +212,19 @@ private: void HandleError(TError error) override { - YT_LOG_DEBUG(error, "Request attempt failed (RequestId: %v, Attempt: %v of %v)", + YT_LOG_DEBUG(error, "Request attempt failed (RequestId: %v, Attempt: %v)", Request_->GetRequestId(), - CurrentAttempt_, - Config_->RetryAttempts); + MakeFormatterWrapper([&] (auto* builder) { + if (Config_->EnableExponentialRetryBackoffs) { + builder->AppendFormat("%v of %v", + BackoffStrategy_.GetInvocationIndex() + 1, + BackoffStrategy_.GetInvocationCount()); + } else { + builder->AppendFormat("%v of %v", + CurrentAttempt_, + Config_->RetryAttempts); + } + })); if (!RetryChecker_.Run(error)) { ResponseHandler_->HandleError(std::move(error)); @@ -276,15 +287,25 @@ private: void Retry() { - int count = ++CurrentAttempt_; - if (count > Config_->RetryAttempts || TInstant::Now() + Config_->RetryBackoffTime > Deadline_) { + auto retryAttemptsExhausted = false; + auto backoffTime = TDuration::Zero(); + if (Config_->EnableExponentialRetryBackoffs) { + retryAttemptsExhausted = !BackoffStrategy_.Next(); + backoffTime = BackoffStrategy_.GetBackoff(); + } else { + auto count = ++CurrentAttempt_; + retryAttemptsExhausted = count > Config_->RetryAttempts; + backoffTime = Config_->RetryBackoffTime; + } + + if (retryAttemptsExhausted || TInstant::Now() + backoffTime > Deadline_) { ReportError(TError(NRpc::EErrorCode::Unavailable, "Request retries failed")); return; } TDelayedExecutor::Submit( BIND(&TRetryingRequest::DoRetry, MakeStrong(this)), - Config_->RetryBackoffTime, + backoffTime, TDispatcher::Get()->GetHeavyInvoker()); } @@ -305,7 +326,7 @@ private: void DoSend() { - YT_LOG_DEBUG("Request attempt started (RequestId: %v, Method: %v.%v, %v%vAttempt: %v of %v, RequestTimeout: %v, RetryTimeout: %v)", + YT_LOG_DEBUG("Request attempt started (RequestId: %v, Method: %v.%v, %v%vAttempt: %v, RequestTimeout: %v, RetryTimeout: %v)", Request_->GetRequestId(), Request_->GetService(), Request_->GetMethod(), @@ -319,8 +340,17 @@ private: builder->AppendFormat("UserTag: %v, ", Request_->GetUserTag()); } }), - CurrentAttempt_, - Config_->RetryAttempts, + MakeFormatterWrapper([&] (auto* builder) { + if (Config_->EnableExponentialRetryBackoffs) { + builder->AppendFormat("%v of %v", + BackoffStrategy_.GetInvocationIndex() + 1, + BackoffStrategy_.GetInvocationCount()); + } else { + builder->AppendFormat("%v of %v", + CurrentAttempt_, + Config_->RetryAttempts); + } + }), Options_.Timeout, Config_->RetryTimeout); diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index 7b64dd6da9..da19d392a6 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -77,7 +77,7 @@ TYPED_TEST(TRpcTest, RetryingSend) { auto config = New<TRetryingChannelConfig>(); config->Load(ConvertTo<INodePtr>(TYsonString(TStringBuf( - "{retry_backoff_time=10}")))); + "{enable_exponential_retry_backoffs=true;retry_backoff={min_backoff=10}}")))); IChannelPtr channel = CreateRetryingChannel( std::move(config), @@ -129,7 +129,7 @@ TYPED_TEST(TNotUdsTest, Address) { auto config = New<TRetryingChannelConfig>(); config->Load(ConvertTo<INodePtr>(TYsonString(TStringBuf( - "{retry_backoff_time=10}")))); + "{enable_exponential_retry_backoffs=true;retry_backoff={min_backoff=10}}")))); testChannel(CreateRetryingChannel( std::move(config), this->CreateChannel())); |