diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-16 14:31:05 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-16 14:31:05 +0300 |
commit | 91baa58196a4f481f246c747dc20c9b25ff96cb4 (patch) | |
tree | d94e4d04f452da6d8bcda3fde8ba2e1c6707a583 | |
parent | c1dea0408f5bf27e7e79d0439cddb420bec298f5 (diff) | |
download | ydb-91baa58196a4f481f246c747dc20c9b25ff96cb4.tar.gz |
YQ-842 Move IRetryPolicy from PQ SDK to library
ref:6f3f663c57e22b6d97bfc34fb795be48ad8e6ec0
17 files changed, 438 insertions, 246 deletions
diff --git a/library/cpp/retry/retry_policy.h b/library/cpp/retry/retry_policy.h new file mode 100644 index 0000000000..a2f94840b8 --- /dev/null +++ b/library/cpp/retry/retry_policy.h @@ -0,0 +1,289 @@ +#pragma once +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/typetraits.h> +#include <util/random/random.h> + +#include <functional> +#include <limits> +#include <memory> + +//! Retry policy. +//! Calculates delay before next retry (if any). +//! Has several default implementations: +//! - exponential backoff policy; +//! - retries with fixed interval; +//! - no retries. + +enum class ERetryErrorClass { + // This error shouldn't be retried. + NoRetry, + + // This error could be retried in short period of time. + ShortRetry, + + // This error requires waiting before it could be retried. + LongRetry, +}; + +template <class... TArgs> +struct IRetryPolicy { + using TPtr = std::shared_ptr<IRetryPolicy>; + + using TRetryClassFunction = std::function<ERetryErrorClass(typename TTypeTraits<TArgs>::TFuncParam...)>; + + //! Retry state of single request. + struct IRetryState { + using TPtr = std::unique_ptr<IRetryState>; + + virtual ~IRetryState() = default; + + //! Calculate delay before next retry if next retry is allowed. + //! Returns empty maybe if retry is not allowed anymore. + virtual TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) = 0; + }; + + virtual ~IRetryPolicy() = default; + + //! Function that is called after first error + //! to find out a futher retry behaviour. + //! Retry state is expected to be created for the whole single retry session. + virtual typename IRetryState::TPtr CreateRetryState() const = 0; + + //! + //! Default implementations. + //! + + static TPtr GetNoRetryPolicy(); // Denies all kind of retries. + + //! Randomized exponential backoff policy. + static TPtr GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction, + TDuration minDelay = TDuration::MilliSeconds(10), + // Delay for statuses that require waiting before retry (such as OVERLOADED). + TDuration minLongRetryDelay = TDuration::MilliSeconds(200), + TDuration maxDelay = TDuration::Seconds(30), + size_t maxRetries = std::numeric_limits<size_t>::max(), + TDuration maxTime = TDuration::Max(), + double scaleFactor = 2.0); + + //! Randomized fixed interval policy. + static TPtr GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction, + TDuration delay = TDuration::MilliSeconds(100), + // Delay for statuses that require waiting before retry (such as OVERLOADED). + TDuration longRetryDelay = TDuration::MilliSeconds(300), + size_t maxRetries = std::numeric_limits<size_t>::max(), + TDuration maxTime = TDuration::Max()); +}; + +template <class... TArgs> +struct TNoRetryPolicy : IRetryPolicy<TArgs...> { + using IRetryState = typename IRetryPolicy<TArgs...>::IRetryState; + + struct TNoRetryState : IRetryState { + TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam...) override { + return Nothing(); + } + }; + + typename IRetryState::TPtr CreateRetryState() const override { + return std::make_unique<TNoRetryState>(); + } +}; + +namespace NRetryDetails { +inline TDuration RandomizeDelay(TDuration baseDelay) { + const TDuration::TValue half = baseDelay.GetValue() / 2; + return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); +} +} // namespace NRetryDetails + +template <class... TArgs> +struct TExponentialBackoffPolicy : IRetryPolicy<TArgs...> { + using IRetryPolicy = IRetryPolicy<TArgs...>; + using IRetryState = typename IRetryPolicy::IRetryState; + + struct TExponentialBackoffState : IRetryState { + TExponentialBackoffState(typename IRetryPolicy::TRetryClassFunction retryClassFunction, + TDuration minDelay, + TDuration minLongRetryDelay, + TDuration maxDelay, + size_t maxRetries, + TDuration maxTime, + double scaleFactor) + : MinLongRetryDelay(minLongRetryDelay) + , MaxDelay(maxDelay) + , MaxRetries(maxRetries) + , MaxTime(maxTime) + , ScaleFactor(scaleFactor) + , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero()) + , CurrentDelay(minDelay) + , AttemptsDone(0) + , RetryClassFunction(std::move(retryClassFunction)) + { + } + + TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override { + const ERetryErrorClass errorClass = RetryClassFunction(args...); + if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) { + return Nothing(); + } + + if (errorClass == ERetryErrorClass::LongRetry) { + CurrentDelay = Max(CurrentDelay, MinLongRetryDelay); + } + + const TDuration delay = NRetryDetails::RandomizeDelay(CurrentDelay); + + if (CurrentDelay < MaxDelay) { + CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay); + } + + ++AttemptsDone; + return delay; + } + + const TDuration MinLongRetryDelay; + const TDuration MaxDelay; + const size_t MaxRetries; + const TDuration MaxTime; + const double ScaleFactor; + const TInstant StartTime; + TDuration CurrentDelay; + size_t AttemptsDone; + typename IRetryPolicy::TRetryClassFunction RetryClassFunction; + }; + + TExponentialBackoffPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction, + TDuration minDelay, + TDuration minLongRetryDelay, + TDuration maxDelay, + size_t maxRetries, + TDuration maxTime, + double scaleFactor) + : MinDelay(minDelay) + , MinLongRetryDelay(minLongRetryDelay) + , MaxDelay(maxDelay) + , MaxRetries(maxRetries) + , MaxTime(maxTime) + , ScaleFactor(scaleFactor) + , RetryClassFunction(std::move(retryClassFunction)) + { + Y_ASSERT(RetryClassFunction); + Y_ASSERT(MinDelay < MaxDelay); + Y_ASSERT(MinLongRetryDelay < MaxDelay); + Y_ASSERT(MinLongRetryDelay >= MinDelay); + Y_ASSERT(ScaleFactor > 1.0); + Y_ASSERT(MaxRetries > 0); + Y_ASSERT(MaxTime > MinDelay); + } + + typename IRetryState::TPtr CreateRetryState() const override { + return std::make_unique<TExponentialBackoffState>(RetryClassFunction, MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor); + } + + const TDuration MinDelay; + const TDuration MinLongRetryDelay; + const TDuration MaxDelay; + const size_t MaxRetries; + const TDuration MaxTime; + const double ScaleFactor; + typename IRetryPolicy::TRetryClassFunction RetryClassFunction; +}; + +template <class... TArgs> +struct TFixedIntervalPolicy : IRetryPolicy<TArgs...> { + using IRetryPolicy = IRetryPolicy<TArgs...>; + using IRetryState = typename IRetryPolicy::IRetryState; + + struct TFixedIntervalState : IRetryState { + TFixedIntervalState(typename IRetryPolicy::TRetryClassFunction retryClassFunction, + TDuration delay, + TDuration longRetryDelay, + size_t maxRetries, + TDuration maxTime) + : Delay(delay) + , LongRetryDelay(longRetryDelay) + , MaxRetries(maxRetries) + , MaxTime(maxTime) + , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero()) + , AttemptsDone(0) + , RetryClassFunction(std::move(retryClassFunction)) + { + } + + TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override { + const ERetryErrorClass errorClass = RetryClassFunction(args...); + if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) { + return Nothing(); + } + + const TDuration delay = NRetryDetails::RandomizeDelay(errorClass == ERetryErrorClass::LongRetry ? LongRetryDelay : Delay); + + ++AttemptsDone; + return delay; + } + + const TDuration Delay; + const TDuration LongRetryDelay; + const size_t MaxRetries; + const TDuration MaxTime; + const TInstant StartTime; + size_t AttemptsDone; + typename IRetryPolicy::TRetryClassFunction RetryClassFunction; + }; + + TFixedIntervalPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction, + TDuration delay, + TDuration longRetryDelay, + size_t maxRetries, + TDuration maxTime) + : Delay(delay) + , LongRetryDelay(longRetryDelay) + , MaxRetries(maxRetries) + , MaxTime(maxTime) + , RetryClassFunction(std::move(retryClassFunction)) + { + Y_ASSERT(RetryClassFunction); + Y_ASSERT(MaxRetries > 0); + Y_ASSERT(MaxTime > Delay); + Y_ASSERT(MaxTime > LongRetryDelay); + Y_ASSERT(LongRetryDelay >= Delay); + } + + typename IRetryState::TPtr CreateRetryState() const override { + return std::make_unique<TFixedIntervalState>(RetryClassFunction, Delay, LongRetryDelay, MaxRetries, MaxTime); + } + + const TDuration Delay; + const TDuration LongRetryDelay; + const size_t MaxRetries; + const TDuration MaxTime; + typename IRetryPolicy::TRetryClassFunction RetryClassFunction; +}; + +template <class... TArgs> +typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetNoRetryPolicy() { + return std::make_shared<TNoRetryPolicy<TArgs...>>(); +} + +template <class... TArgs> +typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction, + TDuration minDelay, + TDuration minLongRetryDelay, + TDuration maxDelay, + size_t maxRetries, + TDuration maxTime, + double scaleFactor) +{ + return std::make_shared<TExponentialBackoffPolicy<TArgs...>>(std::move(retryClassFunction), minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor); +} + +template <class... TArgs> +typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction, + TDuration delay, + TDuration longRetryDelay, + size_t maxRetries, + TDuration maxTime) +{ + return std::make_shared<TFixedIntervalPolicy<TArgs...>>(std::move(retryClassFunction), delay, longRetryDelay, maxRetries, maxTime); +} diff --git a/library/cpp/retry/retry_policy_ut.cpp b/library/cpp/retry/retry_policy_ut.cpp new file mode 100644 index 0000000000..32a5896e08 --- /dev/null +++ b/library/cpp/retry/retry_policy_ut.cpp @@ -0,0 +1,108 @@ +#include "retry_policy.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(RetryPolicy) { + Y_UNIT_TEST(NoRetryPolicy) { + auto policy = IRetryPolicy<int>::GetNoRetryPolicy(); + UNIT_ASSERT(!policy->CreateRetryState()->GetNextRetryDelay(42)); + } + + using ITestPolicy = IRetryPolicy<ERetryErrorClass>; + + ERetryErrorClass ErrorClassFunction(ERetryErrorClass err) { + return err; + } + +#define ASSERT_INTERVAL(from, to, val) { \ + auto v = val; \ + UNIT_ASSERT(v); \ + UNIT_ASSERT_GE_C(*v, from, *v); \ + UNIT_ASSERT_LE_C(*v, to, *v); \ + } + + Y_UNIT_TEST(FixedIntervalPolicy) { + auto policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100)); + auto state = policy->CreateRetryState(); + for (int i = 0; i < 5; ++i) { + ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry)); + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry)); + } + } + + Y_UNIT_TEST(ExponentialBackoffPolicy) { + auto policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100), TDuration::Seconds(500)); + auto state = policy->CreateRetryState(); + + // Step 1 + ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + + // Step 2 + ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry)); + + // Step 3 + ASSERT_INTERVAL(TDuration::Seconds(100), TDuration::Seconds(200), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + + // Step 4 + ASSERT_INTERVAL(TDuration::Seconds(200), TDuration::Seconds(400), state->GetNextRetryDelay(ERetryErrorClass::LongRetry)); + + // Step 5. Max delay + ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::LongRetry)); + ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + + // No retry + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry)); + } + + void TestMaxRetries(bool exponentialBackoff) { + ITestPolicy::TPtr policy; + if (exponentialBackoff) { + policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 3); + } else { + policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 3); + } + auto state = policy->CreateRetryState(); + UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + } + + void TestMaxTime(bool exponentialBackoff) { + ITestPolicy::TPtr policy; + const TDuration maxDelay = TDuration::Seconds(2); + if (exponentialBackoff) { + policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 100500, maxDelay); + } else { + policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 100500, maxDelay); + } + const TInstant start = TInstant::Now(); + auto state = policy->CreateRetryState(); + for (int i = 0; i < 3; ++i) { + auto delay = state->GetNextRetryDelay(ERetryErrorClass::ShortRetry); + const TInstant now = TInstant::Now(); + UNIT_ASSERT(delay || now - start >= maxDelay); + } + Sleep(maxDelay); + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry)); + } + + Y_UNIT_TEST(MaxRetriesExponentialBackoff) { + TestMaxRetries(true); + } + + Y_UNIT_TEST(MaxRetriesFixedInterval) { + TestMaxRetries(false); + } + + Y_UNIT_TEST(MaxTimeExponentialBackoff) { + TestMaxTime(true); + } + + Y_UNIT_TEST(MaxTimeFixedInterval) { + TestMaxTime(false); + } +} diff --git a/library/cpp/retry/ut/ya.make b/library/cpp/retry/ut/ya.make index ff8259bfdb..d423f2be6f 100644 --- a/library/cpp/retry/ut/ya.make +++ b/library/cpp/retry/ut/ya.make @@ -7,6 +7,7 @@ OWNER( ) SRCS( + retry_policy_ut.cpp retry_ut.cpp ) diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp index c5d6fa4fa2..0ebdbc4f70 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp +++ b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp @@ -136,7 +136,7 @@ public: if (!RetryState) { RetryState = NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy()->CreateRetryState(); } - TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status); + TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus()); if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) { nextRetryDelay = Nothing(); // Not retryable } @@ -198,7 +198,7 @@ private: NYdb::TDriver YdbDriver; NYdb::NPersQueue::TPersQueueClient PqClient; ui64 Index = 0; - NYdb::NPersQueue::IRetryState::TPtr RetryState; + NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState; bool RequestInFlight = false; bool Finishing = false; }; diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp index b5c2e4d549..135527411e 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp +++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp @@ -129,7 +129,7 @@ public: 2.0 // scaleFactor )->CreateRetryState(); } - TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status); + TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus()); if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) { nextRetryDelay = Nothing(); // No topic => OK. Leave just transient issues. } @@ -173,7 +173,7 @@ private: NYdb::NPersQueue::TPersQueueClient PqClient; ui64 Index = 0; const size_t MaxRetries; - NYdb::NPersQueue::IRetryState::TPtr RetryState; + NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState; }; // Actor for deletion of read rules for all topics in the query. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp index dd42c8c4ed..e0e9fd6857 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp @@ -4,7 +4,7 @@ namespace NYdb::NPersQueue { -IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) { +ERetryErrorClass GetRetryErrorClass(EStatus status) { switch (status) { case EStatus::SUCCESS: case EStatus::INTERNAL_ERROR: @@ -19,7 +19,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) { case EStatus::CLIENT_INTERNAL_ERROR: case EStatus::CLIENT_CANCELLED: case EStatus::CLIENT_OUT_OF_RANGE: - return IRetryPolicy::ERetryErrorClass::ShortRetry; + return ERetryErrorClass::ShortRetry; case EStatus::OVERLOADED: case EStatus::TIMEOUT: @@ -28,7 +28,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) { case EStatus::CLIENT_DEADLINE_EXCEEDED: case EStatus::CLIENT_LIMITS_REACHED: case EStatus::CLIENT_DISCOVERY_FAILED: - return IRetryPolicy::ERetryErrorClass::LongRetry; + return ERetryErrorClass::LongRetry; case EStatus::SCHEME_ERROR: case EStatus::STATUS_UNDEFINED: @@ -40,14 +40,14 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) { case EStatus::NOT_FOUND: case EStatus::CLIENT_UNAUTHENTICATED: case EStatus::CLIENT_CALL_UNIMPLEMENTED: - return IRetryPolicy::ERetryErrorClass::NoRetry; + return ERetryErrorClass::NoRetry; } } -IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status) { +ERetryErrorClass GetRetryErrorClassV2(EStatus status) { switch (status) { case EStatus::SCHEME_ERROR: - return IRetryPolicy::ERetryErrorClass::NoRetry; + return ERetryErrorClass::NoRetry; default: return GetRetryErrorClass(status); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 7287b69894..8c140e2217 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -11,8 +11,8 @@ namespace NYdb::NPersQueue { -IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status); -IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status); +ERetryErrorClass GetRetryErrorClass(EStatus status); +ERetryErrorClass GetRetryErrorClassV2(EStatus status); void Cancel(NGrpc::IQueueClientContextPtr& context); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp index a22ed23cea..aff01791b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp @@ -170,189 +170,13 @@ TAsyncDescribeTopicResult TPersQueueClient::DescribeTopic(const TString& path, c return Impl_->DescribeTopic(path, settings); } -namespace { - -struct TNoRetryState : IRetryState { - TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override { - return Nothing(); - } -}; - -struct TNoRetryPolicy : IRetryPolicy { - IRetryState::TPtr CreateRetryState() const override { - return std::make_unique<TNoRetryState>(); - } -}; - -TDuration RandomizeDelay(TDuration baseDelay) { - const TDuration::TValue half = baseDelay.GetValue() / 2; - return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half)); -} - -struct TExponentialBackoffState : IRetryState { - TExponentialBackoffState(TDuration minDelay, - TDuration minLongRetryDelay, - TDuration maxDelay, - size_t maxRetries, - TDuration maxTime, - double scaleFactor, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction) - : MinLongRetryDelay(minLongRetryDelay) - , MaxDelay(maxDelay) - , MaxRetries(maxRetries) - , MaxTime(maxTime) - , ScaleFactor(scaleFactor) - , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero()) - , CurrentDelay(minDelay) - , AttemptsDone(0) - , RetryErrorClassFunction(retryErrorClassFunction) - { - } - - TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override { - const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus()); - if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) { - return Nothing(); - } - - if (errorClass == IRetryPolicy::ERetryErrorClass::LongRetry) { - CurrentDelay = Max(CurrentDelay, MinLongRetryDelay); - } - - const TDuration delay = RandomizeDelay(CurrentDelay); - - if (CurrentDelay < MaxDelay) { - CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay); - } - - ++AttemptsDone; - return delay; - } - - const TDuration MinLongRetryDelay; - const TDuration MaxDelay; - const size_t MaxRetries; - const TDuration MaxTime; - const double ScaleFactor; - const TInstant StartTime; - TDuration CurrentDelay; - size_t AttemptsDone; - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction; -}; - -struct TExponentialBackoffPolicy : IRetryPolicy { - TExponentialBackoffPolicy(TDuration minDelay, - TDuration minLongRetryDelay, - TDuration maxDelay, - size_t maxRetries, - TDuration maxTime, - double scaleFactor, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction) - : MinDelay(minDelay) - , MinLongRetryDelay(minLongRetryDelay) - , MaxDelay(maxDelay) - , MaxRetries(maxRetries) - , MaxTime(maxTime) - , ScaleFactor(scaleFactor) - , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass) - { - Y_ASSERT(MinDelay < MaxDelay); - Y_ASSERT(MinLongRetryDelay < MaxDelay); - Y_ASSERT(MinLongRetryDelay >= MinDelay); - Y_ASSERT(ScaleFactor > 1.0); - Y_ASSERT(MaxRetries > 0); - Y_ASSERT(MaxTime > MinDelay); - } - - IRetryState::TPtr CreateRetryState() const override { - return std::make_unique<TExponentialBackoffState>(MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor, RetryErrorClassFunction); - } - - const TDuration MinDelay; - const TDuration MinLongRetryDelay; - const TDuration MaxDelay; - const size_t MaxRetries; - const TDuration MaxTime; - const double ScaleFactor; - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction; -}; - -struct TFixedIntervalState : IRetryState { - TFixedIntervalState(TDuration delay, - TDuration longRetryDelay, - size_t maxRetries, - TDuration maxTime, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction) - : Delay(delay) - , LongRetryDelay(longRetryDelay) - , MaxRetries(maxRetries) - , MaxTime(maxTime) - , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero()) - , AttemptsDone(0) - , RetryErrorClassFunction(retryErrorClassFunction) - { - } - - TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override { - const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus()); - if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) { - return Nothing(); - } - - const TDuration delay = RandomizeDelay(errorClass == IRetryPolicy::ERetryErrorClass::LongRetry ? LongRetryDelay : Delay); - - ++AttemptsDone; - return delay; - } - - const TDuration Delay; - const TDuration LongRetryDelay; - const size_t MaxRetries; - const TDuration MaxTime; - const TInstant StartTime; - size_t AttemptsDone; - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction; -}; - -struct TFixedIntervalPolicy : IRetryPolicy { - TFixedIntervalPolicy(TDuration delay, - TDuration longRetryDelay, - size_t maxRetries, - TDuration maxTime, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction) - : Delay(delay) - , LongRetryDelay(longRetryDelay) - , MaxRetries(maxRetries) - , MaxTime(maxTime) - , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass) - { - Y_ASSERT(MaxRetries > 0); - Y_ASSERT(MaxTime > Delay); - Y_ASSERT(MaxTime > LongRetryDelay); - Y_ASSERT(LongRetryDelay >= Delay); - } - - IRetryState::TPtr CreateRetryState() const override { - return std::make_unique<TFixedIntervalState>(Delay, LongRetryDelay, MaxRetries, MaxTime, RetryErrorClassFunction); - } - - const TDuration Delay; - const TDuration LongRetryDelay; - const size_t MaxRetries; - const TDuration MaxTime; - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction; -}; - -} // namespace - IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() { static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy(); return policy; } IRetryPolicy::TPtr IRetryPolicy::GetNoRetryPolicy() { - static IRetryPolicy::TPtr policy = std::make_shared<TNoRetryPolicy>(); - return policy; + return ::IRetryPolicy<EStatus>::GetNoRetryPolicy(); } IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay, @@ -361,18 +185,18 @@ IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay, size_t maxRetries, TDuration maxTime, double scaleFactor, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction) + std::function<ERetryErrorClass(EStatus)> customRetryClassFunction) { - return std::make_shared<TExponentialBackoffPolicy>(minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor, customRetryClassFunction); + return ::IRetryPolicy<EStatus>::GetExponentialBackoffPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor); } IRetryPolicy::TPtr IRetryPolicy::GetFixedIntervalPolicy(TDuration delay, TDuration longRetryDelay, size_t maxRetries, TDuration maxTime, - std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction) + std::function<ERetryErrorClass(EStatus)> customRetryClassFunction) { - return std::make_shared<TFixedIntervalPolicy>(delay, longRetryDelay, maxRetries, maxTime, customRetryClassFunction); + return ::IRetryPolicy<EStatus>::GetFixedIntervalPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, delay, longRetryDelay, maxRetries, maxTime); } std::shared_ptr<IReadSession> TPersQueueClient::CreateReadSession(const TReadSessionSettings& settings) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index e7dd0a87e5..3b82d7ec0d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -233,7 +233,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu if (!ClusterDiscoveryRetryState) { ClusterDiscoveryRetryState = Settings.RetryPolicy_->CreateRetryState(); } - TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status); + TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); if (retryDelay) { Log << TLOG_INFO << "Cluster discovery request failed. Status: " << status.GetStatus() << ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\""; @@ -724,7 +724,7 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { ServerMessage = std::make_shared<Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>(); ++ConnectionGeneration; if (RetryState) { - TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(TPlainStatus(status)); + TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status); if (nextDelay) { delay = *nextDelay; delayContext = ClientContext->CreateContext(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index b5b874954f..e08db09456 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -970,7 +970,7 @@ private: size_t ConnectionGeneration = 0; TAdaptiveLock Lock; IProcessor::TPtr Processor; - IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). size_t ConnectionAttemptsDone = 0; // Memory usage. @@ -1103,7 +1103,7 @@ private: std::shared_ptr<TReadSessionEventsQueue> EventsQueue; THashMap<TString, TClusterSessionInfo> ClusterSessions; // Cluster name (in lower case) -> TClusterSessionInfo NGrpc::IQueueClientContextPtr ClusterDiscoveryDelayContext; - IRetryState::TPtr ClusterDiscoveryRetryState; + IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState; bool DataReadingSuspended = false; NGrpc::IQueueClientContextPtr DumpCountersContext; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index af4e08b979..0a338cfc1d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -94,7 +94,7 @@ TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& stat if (!RetryState) { RetryState = Settings.RetryPolicy_->CreateRetryState(); } - nextDelay = RetryState->GetNextRetryDelay(TPlainStatus(status)); + nextDelay = RetryState->GetNextRetryDelay(status.Status); if (nextDelay) { result.StartDelay = *nextDelay; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 324550bb02..6f506a0080 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -387,7 +387,7 @@ private: size_t ConnectionAttemptsDone = 0; TAdaptiveLock Lock; IProcessor::TPtr Processor; - IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. TString SessionId; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index ee9aa75a0a..ddb1b2d82b 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -4,6 +4,7 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/log.h> +#include <library/cpp/retry/retry_policy.h> #include <util/datetime/base.h> #include <util/generic/hash.h> @@ -885,29 +886,8 @@ TString DebugString(const TReadSessionEvent::TEvent& event); //! - exponential backoff policy; //! - retries with fixed interval; //! - no retries. -//! TODO: move to common header (not persqueue). - -//! Retry state of single request. -struct IRetryState { - using TPtr = std::unique_ptr<IRetryState>; - - virtual ~IRetryState() = default; - - //! Calculate delay before next retry if next retry is allowed. - //! Returns empty maybe if retry is not allowed anymore. - virtual TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) = 0; -}; - -struct IRetryPolicy { - using TPtr = std::shared_ptr<IRetryPolicy>; - - virtual ~IRetryPolicy() = default; - - //! Function that is called after first error - //! to find out a futher retry behaviour. - //! Retry state is expected to be created for the whole single retry session. - virtual IRetryState::TPtr CreateRetryState() const = 0; +struct IRetryPolicy : ::IRetryPolicy<EStatus> { //! //! Default implementations. //! @@ -915,17 +895,6 @@ struct IRetryPolicy { static TPtr GetDefaultPolicy(); // Exponential backoff with infinite retry attempts. static TPtr GetNoRetryPolicy(); // Denies all kind of retries. - enum class ERetryErrorClass { - // This error shouldn't be retried. - NoRetry, - - // This error could be retried in short period of time. - ShortRetry, - - // This error requires waiting before it could be retried. - LongRetry, - }; - //! Randomized exponential backoff policy. static TPtr GetExponentialBackoffPolicy(TDuration minDelay = TDuration::MilliSeconds(10), // Delay for statuses that require waiting before retry (such as OVERLOADED). diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 9b2367e3f3..002dd47290 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -578,7 +578,7 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() { Settings .AppendTopics({"TestTopic"}) .ConsumerName("TestConsumer") - .RetryPolicy(IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10))) + .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10))) .Counters(MakeIntrusive<NYdb::NPersQueue::TReaderCounters>(MakeIntrusive<NMonitoring::TDynamicCounters>())); Log.SetFormatter(GetPrefixLogFormatter("")); @@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { TReadSessionSettings settings = setup.GetReadSessionSettings(); // Set policy with max retries == 3. - settings.RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(TDuration::MilliSeconds(10), TDuration::MilliSeconds(10), TDuration::MilliSeconds(100), 3)); + settings.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy(TDuration::MilliSeconds(10), TDuration::MilliSeconds(10), TDuration::MilliSeconds(100), 3)); std::shared_ptr<IReadSession> session = setup.GetPersQueueClient().CreateReadSession(settings); TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(true); UNIT_ASSERT(event); @@ -1002,7 +1002,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { void StopsRetryAfterFailedAttemptImpl(bool timeout) { TReadSessionImplTestSetup setup; - setup.Settings.RetryPolicy(IRetryPolicy::GetNoRetryPolicy()); + setup.Settings.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); EXPECT_CALL(*setup.MockProcessorFactory, OnCreateProcessor(_)) .WillOnce([&]() { if (timeout) @@ -1026,26 +1026,26 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { } Y_UNIT_TEST(UsesOnRetryStateDuringRetries) { - class TTestRetryState : public IRetryState { + class TTestRetryState : public NYdb::NPersQueue::IRetryPolicy::IRetryState { TDuration Delay; - TMaybe<TDuration> GetNextRetryDelay(const NYdb::TStatus&) override { + TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override { Delay += TDuration::Seconds(1); return Delay; } }; - class TTestRetryPolicy : public IRetryPolicy { + class TTestRetryPolicy : public NYdb::NPersQueue::IRetryPolicy { IRetryState::TPtr CreateRetryState() const override { return IRetryState::TPtr(new TTestRetryState()); } }; - std::shared_ptr<IRetryState> state(new TTestRetryState()); + std::shared_ptr<NYdb::NPersQueue::IRetryPolicy::IRetryState> state(new TTestRetryState()); TReadSessionImplTestSetup setup; ON_CALL(*setup.MockProcessorFactory, ValidateConnectTimeout(_)) .WillByDefault([state](TDuration timeout) mutable { - UNIT_ASSERT_VALUES_EQUAL(timeout, *state->GetNextRetryDelay(NYdb::TStatus(TPlainStatus()))); + UNIT_ASSERT_VALUES_EQUAL(timeout, *state->GetNextRetryDelay(NYdb::EStatus::INTERNAL_ERROR)); }); auto failCreation = [&]() { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h index 7fb219757b..69d62dfa48 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h @@ -163,7 +163,7 @@ public: } }; -struct TYdbPqTestRetryState : IRetryState { +struct TYdbPqTestRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState { TYdbPqTestRetryState( std::function<void ()> retryCallback, std::function<void ()> destroyCallback, const TDuration& delay ) @@ -172,7 +172,7 @@ struct TYdbPqTestRetryState : IRetryState { , Delay(delay) {} - TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override { + TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override { Cerr << "Test retry state: get retry delay\n"; RetryDone(); return Delay; @@ -185,9 +185,9 @@ struct TYdbPqTestRetryState : IRetryState { DestroyDone(); } }; -struct TYdbPqNoRetryState : IRetryState { +struct TYdbPqNoRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState { TAtomic DelayCalled = 0; - TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override { + TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override { auto res = AtomicSwap(&DelayCalled, 0); UNIT_ASSERT(!res); return Nothing(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp index 3cf6e7f8b8..66385d41a8 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp @@ -25,7 +25,7 @@ namespace NKikimr::NPersQueueTests { ) { auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); - settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? IRetryPolicy::GetDefaultPolicy() : IRetryPolicy::GetNoRetryPolicy()); + settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); if (codec) { if (*codec == "raw") settings.Codec(ECodec::RAW); @@ -55,7 +55,7 @@ namespace NKikimr::NPersQueueTests { Y_UNUSED(codec); auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId); if (partitionGroup) settings.PartitionGroupId(*partitionGroup); - settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? IRetryPolicy::GetDefaultPolicy() : IRetryPolicy::GetNoRetryPolicy()); + settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy()); return CreateSimpleWriter(driver, settings); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make index d19d4f6b2e..3314e9284c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make @@ -13,6 +13,7 @@ SRCS( ) PEERDIR( + library/cpp/retry ydb/public/sdk/cpp/client/ydb_persqueue_core/impl ydb/public/sdk/cpp/client/ydb_proto ) |