aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-16 14:31:05 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-16 14:31:05 +0300
commit91baa58196a4f481f246c747dc20c9b25ff96cb4 (patch)
treed94e4d04f452da6d8bcda3fde8ba2e1c6707a583
parentc1dea0408f5bf27e7e79d0439cddb420bec298f5 (diff)
downloadydb-91baa58196a4f481f246c747dc20c9b25ff96cb4.tar.gz
YQ-842 Move IRetryPolicy from PQ SDK to library
ref:6f3f663c57e22b6d97bfc34fb795be48ad8e6ec0
-rw-r--r--library/cpp/retry/retry_policy.h289
-rw-r--r--library/cpp/retry/retry_policy_ut.cpp108
-rw-r--r--library/cpp/retry/ut/ya.make1
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_creator.cpp4
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_deleter.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp12
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp186
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h35
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp16
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make1
17 files changed, 438 insertions, 246 deletions
diff --git a/library/cpp/retry/retry_policy.h b/library/cpp/retry/retry_policy.h
new file mode 100644
index 0000000000..a2f94840b8
--- /dev/null
+++ b/library/cpp/retry/retry_policy.h
@@ -0,0 +1,289 @@
+#pragma once
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/typetraits.h>
+#include <util/random/random.h>
+
+#include <functional>
+#include <limits>
+#include <memory>
+
+//! Retry policy.
+//! Calculates delay before next retry (if any).
+//! Has several default implementations:
+//! - exponential backoff policy;
+//! - retries with fixed interval;
+//! - no retries.
+
+enum class ERetryErrorClass {
+ // This error shouldn't be retried.
+ NoRetry,
+
+ // This error could be retried in short period of time.
+ ShortRetry,
+
+ // This error requires waiting before it could be retried.
+ LongRetry,
+};
+
+template <class... TArgs>
+struct IRetryPolicy {
+ using TPtr = std::shared_ptr<IRetryPolicy>;
+
+ using TRetryClassFunction = std::function<ERetryErrorClass(typename TTypeTraits<TArgs>::TFuncParam...)>;
+
+ //! Retry state of single request.
+ struct IRetryState {
+ using TPtr = std::unique_ptr<IRetryState>;
+
+ virtual ~IRetryState() = default;
+
+ //! Calculate delay before next retry if next retry is allowed.
+ //! Returns empty maybe if retry is not allowed anymore.
+ virtual TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) = 0;
+ };
+
+ virtual ~IRetryPolicy() = default;
+
+ //! Function that is called after first error
+ //! to find out a futher retry behaviour.
+ //! Retry state is expected to be created for the whole single retry session.
+ virtual typename IRetryState::TPtr CreateRetryState() const = 0;
+
+ //!
+ //! Default implementations.
+ //!
+
+ static TPtr GetNoRetryPolicy(); // Denies all kind of retries.
+
+ //! Randomized exponential backoff policy.
+ static TPtr GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction,
+ TDuration minDelay = TDuration::MilliSeconds(10),
+ // Delay for statuses that require waiting before retry (such as OVERLOADED).
+ TDuration minLongRetryDelay = TDuration::MilliSeconds(200),
+ TDuration maxDelay = TDuration::Seconds(30),
+ size_t maxRetries = std::numeric_limits<size_t>::max(),
+ TDuration maxTime = TDuration::Max(),
+ double scaleFactor = 2.0);
+
+ //! Randomized fixed interval policy.
+ static TPtr GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction,
+ TDuration delay = TDuration::MilliSeconds(100),
+ // Delay for statuses that require waiting before retry (such as OVERLOADED).
+ TDuration longRetryDelay = TDuration::MilliSeconds(300),
+ size_t maxRetries = std::numeric_limits<size_t>::max(),
+ TDuration maxTime = TDuration::Max());
+};
+
+template <class... TArgs>
+struct TNoRetryPolicy : IRetryPolicy<TArgs...> {
+ using IRetryState = typename IRetryPolicy<TArgs...>::IRetryState;
+
+ struct TNoRetryState : IRetryState {
+ TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam...) override {
+ return Nothing();
+ }
+ };
+
+ typename IRetryState::TPtr CreateRetryState() const override {
+ return std::make_unique<TNoRetryState>();
+ }
+};
+
+namespace NRetryDetails {
+inline TDuration RandomizeDelay(TDuration baseDelay) {
+ const TDuration::TValue half = baseDelay.GetValue() / 2;
+ return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
+}
+} // namespace NRetryDetails
+
+template <class... TArgs>
+struct TExponentialBackoffPolicy : IRetryPolicy<TArgs...> {
+ using IRetryPolicy = IRetryPolicy<TArgs...>;
+ using IRetryState = typename IRetryPolicy::IRetryState;
+
+ struct TExponentialBackoffState : IRetryState {
+ TExponentialBackoffState(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+ TDuration minDelay,
+ TDuration minLongRetryDelay,
+ TDuration maxDelay,
+ size_t maxRetries,
+ TDuration maxTime,
+ double scaleFactor)
+ : MinLongRetryDelay(minLongRetryDelay)
+ , MaxDelay(maxDelay)
+ , MaxRetries(maxRetries)
+ , MaxTime(maxTime)
+ , ScaleFactor(scaleFactor)
+ , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
+ , CurrentDelay(minDelay)
+ , AttemptsDone(0)
+ , RetryClassFunction(std::move(retryClassFunction))
+ {
+ }
+
+ TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
+ const ERetryErrorClass errorClass = RetryClassFunction(args...);
+ if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
+ return Nothing();
+ }
+
+ if (errorClass == ERetryErrorClass::LongRetry) {
+ CurrentDelay = Max(CurrentDelay, MinLongRetryDelay);
+ }
+
+ const TDuration delay = NRetryDetails::RandomizeDelay(CurrentDelay);
+
+ if (CurrentDelay < MaxDelay) {
+ CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay);
+ }
+
+ ++AttemptsDone;
+ return delay;
+ }
+
+ const TDuration MinLongRetryDelay;
+ const TDuration MaxDelay;
+ const size_t MaxRetries;
+ const TDuration MaxTime;
+ const double ScaleFactor;
+ const TInstant StartTime;
+ TDuration CurrentDelay;
+ size_t AttemptsDone;
+ typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+ };
+
+ TExponentialBackoffPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+ TDuration minDelay,
+ TDuration minLongRetryDelay,
+ TDuration maxDelay,
+ size_t maxRetries,
+ TDuration maxTime,
+ double scaleFactor)
+ : MinDelay(minDelay)
+ , MinLongRetryDelay(minLongRetryDelay)
+ , MaxDelay(maxDelay)
+ , MaxRetries(maxRetries)
+ , MaxTime(maxTime)
+ , ScaleFactor(scaleFactor)
+ , RetryClassFunction(std::move(retryClassFunction))
+ {
+ Y_ASSERT(RetryClassFunction);
+ Y_ASSERT(MinDelay < MaxDelay);
+ Y_ASSERT(MinLongRetryDelay < MaxDelay);
+ Y_ASSERT(MinLongRetryDelay >= MinDelay);
+ Y_ASSERT(ScaleFactor > 1.0);
+ Y_ASSERT(MaxRetries > 0);
+ Y_ASSERT(MaxTime > MinDelay);
+ }
+
+ typename IRetryState::TPtr CreateRetryState() const override {
+ return std::make_unique<TExponentialBackoffState>(RetryClassFunction, MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor);
+ }
+
+ const TDuration MinDelay;
+ const TDuration MinLongRetryDelay;
+ const TDuration MaxDelay;
+ const size_t MaxRetries;
+ const TDuration MaxTime;
+ const double ScaleFactor;
+ typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+};
+
+template <class... TArgs>
+struct TFixedIntervalPolicy : IRetryPolicy<TArgs...> {
+ using IRetryPolicy = IRetryPolicy<TArgs...>;
+ using IRetryState = typename IRetryPolicy::IRetryState;
+
+ struct TFixedIntervalState : IRetryState {
+ TFixedIntervalState(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+ TDuration delay,
+ TDuration longRetryDelay,
+ size_t maxRetries,
+ TDuration maxTime)
+ : Delay(delay)
+ , LongRetryDelay(longRetryDelay)
+ , MaxRetries(maxRetries)
+ , MaxTime(maxTime)
+ , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
+ , AttemptsDone(0)
+ , RetryClassFunction(std::move(retryClassFunction))
+ {
+ }
+
+ TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
+ const ERetryErrorClass errorClass = RetryClassFunction(args...);
+ if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
+ return Nothing();
+ }
+
+ const TDuration delay = NRetryDetails::RandomizeDelay(errorClass == ERetryErrorClass::LongRetry ? LongRetryDelay : Delay);
+
+ ++AttemptsDone;
+ return delay;
+ }
+
+ const TDuration Delay;
+ const TDuration LongRetryDelay;
+ const size_t MaxRetries;
+ const TDuration MaxTime;
+ const TInstant StartTime;
+ size_t AttemptsDone;
+ typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+ };
+
+ TFixedIntervalPolicy(typename IRetryPolicy::TRetryClassFunction retryClassFunction,
+ TDuration delay,
+ TDuration longRetryDelay,
+ size_t maxRetries,
+ TDuration maxTime)
+ : Delay(delay)
+ , LongRetryDelay(longRetryDelay)
+ , MaxRetries(maxRetries)
+ , MaxTime(maxTime)
+ , RetryClassFunction(std::move(retryClassFunction))
+ {
+ Y_ASSERT(RetryClassFunction);
+ Y_ASSERT(MaxRetries > 0);
+ Y_ASSERT(MaxTime > Delay);
+ Y_ASSERT(MaxTime > LongRetryDelay);
+ Y_ASSERT(LongRetryDelay >= Delay);
+ }
+
+ typename IRetryState::TPtr CreateRetryState() const override {
+ return std::make_unique<TFixedIntervalState>(RetryClassFunction, Delay, LongRetryDelay, MaxRetries, MaxTime);
+ }
+
+ const TDuration Delay;
+ const TDuration LongRetryDelay;
+ const size_t MaxRetries;
+ const TDuration MaxTime;
+ typename IRetryPolicy::TRetryClassFunction RetryClassFunction;
+};
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetNoRetryPolicy() {
+ return std::make_shared<TNoRetryPolicy<TArgs...>>();
+}
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetExponentialBackoffPolicy(TRetryClassFunction retryClassFunction,
+ TDuration minDelay,
+ TDuration minLongRetryDelay,
+ TDuration maxDelay,
+ size_t maxRetries,
+ TDuration maxTime,
+ double scaleFactor)
+{
+ return std::make_shared<TExponentialBackoffPolicy<TArgs...>>(std::move(retryClassFunction), minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor);
+}
+
+template <class... TArgs>
+typename IRetryPolicy<TArgs...>::TPtr IRetryPolicy<TArgs...>::GetFixedIntervalPolicy(TRetryClassFunction retryClassFunction,
+ TDuration delay,
+ TDuration longRetryDelay,
+ size_t maxRetries,
+ TDuration maxTime)
+{
+ return std::make_shared<TFixedIntervalPolicy<TArgs...>>(std::move(retryClassFunction), delay, longRetryDelay, maxRetries, maxTime);
+}
diff --git a/library/cpp/retry/retry_policy_ut.cpp b/library/cpp/retry/retry_policy_ut.cpp
new file mode 100644
index 0000000000..32a5896e08
--- /dev/null
+++ b/library/cpp/retry/retry_policy_ut.cpp
@@ -0,0 +1,108 @@
+#include "retry_policy.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(RetryPolicy) {
+ Y_UNIT_TEST(NoRetryPolicy) {
+ auto policy = IRetryPolicy<int>::GetNoRetryPolicy();
+ UNIT_ASSERT(!policy->CreateRetryState()->GetNextRetryDelay(42));
+ }
+
+ using ITestPolicy = IRetryPolicy<ERetryErrorClass>;
+
+ ERetryErrorClass ErrorClassFunction(ERetryErrorClass err) {
+ return err;
+ }
+
+#define ASSERT_INTERVAL(from, to, val) { \
+ auto v = val; \
+ UNIT_ASSERT(v); \
+ UNIT_ASSERT_GE_C(*v, from, *v); \
+ UNIT_ASSERT_LE_C(*v, to, *v); \
+ }
+
+ Y_UNIT_TEST(FixedIntervalPolicy) {
+ auto policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100));
+ auto state = policy->CreateRetryState();
+ for (int i = 0; i < 5; ++i) {
+ ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry));
+ }
+ }
+
+ Y_UNIT_TEST(ExponentialBackoffPolicy) {
+ auto policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::Seconds(100), TDuration::Seconds(500));
+ auto state = policy->CreateRetryState();
+
+ // Step 1
+ ASSERT_INTERVAL(TDuration::MilliSeconds(50), TDuration::MilliSeconds(100), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+ // Step 2
+ ASSERT_INTERVAL(TDuration::Seconds(50), TDuration::Seconds(100), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+
+ // Step 3
+ ASSERT_INTERVAL(TDuration::Seconds(100), TDuration::Seconds(200), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+ // Step 4
+ ASSERT_INTERVAL(TDuration::Seconds(200), TDuration::Seconds(400), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+
+ // Step 5. Max delay
+ ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::LongRetry));
+ ASSERT_INTERVAL(TDuration::Seconds(250), TDuration::Seconds(500), state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+
+ // No retry
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::NoRetry));
+ }
+
+ void TestMaxRetries(bool exponentialBackoff) {
+ ITestPolicy::TPtr policy;
+ if (exponentialBackoff) {
+ policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 3);
+ } else {
+ policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 3);
+ }
+ auto state = policy->CreateRetryState();
+ UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ UNIT_ASSERT(state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ }
+
+ void TestMaxTime(bool exponentialBackoff) {
+ ITestPolicy::TPtr policy;
+ const TDuration maxDelay = TDuration::Seconds(2);
+ if (exponentialBackoff) {
+ policy = ITestPolicy::GetExponentialBackoffPolicy(ErrorClassFunction, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(30), 100500, maxDelay);
+ } else {
+ policy = ITestPolicy::GetFixedIntervalPolicy(ErrorClassFunction, TDuration::MilliSeconds(100), TDuration::MilliSeconds(300), 100500, maxDelay);
+ }
+ const TInstant start = TInstant::Now();
+ auto state = policy->CreateRetryState();
+ for (int i = 0; i < 3; ++i) {
+ auto delay = state->GetNextRetryDelay(ERetryErrorClass::ShortRetry);
+ const TInstant now = TInstant::Now();
+ UNIT_ASSERT(delay || now - start >= maxDelay);
+ }
+ Sleep(maxDelay);
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ UNIT_ASSERT(!state->GetNextRetryDelay(ERetryErrorClass::ShortRetry));
+ }
+
+ Y_UNIT_TEST(MaxRetriesExponentialBackoff) {
+ TestMaxRetries(true);
+ }
+
+ Y_UNIT_TEST(MaxRetriesFixedInterval) {
+ TestMaxRetries(false);
+ }
+
+ Y_UNIT_TEST(MaxTimeExponentialBackoff) {
+ TestMaxTime(true);
+ }
+
+ Y_UNIT_TEST(MaxTimeFixedInterval) {
+ TestMaxTime(false);
+ }
+}
diff --git a/library/cpp/retry/ut/ya.make b/library/cpp/retry/ut/ya.make
index ff8259bfdb..d423f2be6f 100644
--- a/library/cpp/retry/ut/ya.make
+++ b/library/cpp/retry/ut/ya.make
@@ -7,6 +7,7 @@ OWNER(
)
SRCS(
+ retry_policy_ut.cpp
retry_ut.cpp
)
diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
index c5d6fa4fa2..0ebdbc4f70 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
+++ b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
@@ -136,7 +136,7 @@ public:
if (!RetryState) {
RetryState = NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy()->CreateRetryState();
}
- TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status);
+ TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus());
if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
nextRetryDelay = Nothing(); // Not retryable
}
@@ -198,7 +198,7 @@ private:
NYdb::TDriver YdbDriver;
NYdb::NPersQueue::TPersQueueClient PqClient;
ui64 Index = 0;
- NYdb::NPersQueue::IRetryState::TPtr RetryState;
+ NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState;
bool RequestInFlight = false;
bool Finishing = false;
};
diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
index b5c2e4d549..135527411e 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
+++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
@@ -129,7 +129,7 @@ public:
2.0 // scaleFactor
)->CreateRetryState();
}
- TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status);
+ TMaybe<TDuration> nextRetryDelay = RetryState->GetNextRetryDelay(status.GetStatus());
if (status.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
nextRetryDelay = Nothing(); // No topic => OK. Leave just transient issues.
}
@@ -173,7 +173,7 @@ private:
NYdb::NPersQueue::TPersQueueClient PqClient;
ui64 Index = 0;
const size_t MaxRetries;
- NYdb::NPersQueue::IRetryState::TPtr RetryState;
+ NYdb::NPersQueue::IRetryPolicy::IRetryState::TPtr RetryState;
};
// Actor for deletion of read rules for all topics in the query.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
index dd42c8c4ed..e0e9fd6857 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp
@@ -4,7 +4,7 @@
namespace NYdb::NPersQueue {
-IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
+ERetryErrorClass GetRetryErrorClass(EStatus status) {
switch (status) {
case EStatus::SUCCESS:
case EStatus::INTERNAL_ERROR:
@@ -19,7 +19,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
case EStatus::CLIENT_INTERNAL_ERROR:
case EStatus::CLIENT_CANCELLED:
case EStatus::CLIENT_OUT_OF_RANGE:
- return IRetryPolicy::ERetryErrorClass::ShortRetry;
+ return ERetryErrorClass::ShortRetry;
case EStatus::OVERLOADED:
case EStatus::TIMEOUT:
@@ -28,7 +28,7 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
case EStatus::CLIENT_DEADLINE_EXCEEDED:
case EStatus::CLIENT_LIMITS_REACHED:
case EStatus::CLIENT_DISCOVERY_FAILED:
- return IRetryPolicy::ERetryErrorClass::LongRetry;
+ return ERetryErrorClass::LongRetry;
case EStatus::SCHEME_ERROR:
case EStatus::STATUS_UNDEFINED:
@@ -40,14 +40,14 @@ IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status) {
case EStatus::NOT_FOUND:
case EStatus::CLIENT_UNAUTHENTICATED:
case EStatus::CLIENT_CALL_UNIMPLEMENTED:
- return IRetryPolicy::ERetryErrorClass::NoRetry;
+ return ERetryErrorClass::NoRetry;
}
}
-IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status) {
+ERetryErrorClass GetRetryErrorClassV2(EStatus status) {
switch (status) {
case EStatus::SCHEME_ERROR:
- return IRetryPolicy::ERetryErrorClass::NoRetry;
+ return ERetryErrorClass::NoRetry;
default:
return GetRetryErrorClass(status);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
index 7287b69894..8c140e2217 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
@@ -11,8 +11,8 @@
namespace NYdb::NPersQueue {
-IRetryPolicy::ERetryErrorClass GetRetryErrorClass(EStatus status);
-IRetryPolicy::ERetryErrorClass GetRetryErrorClassV2(EStatus status);
+ERetryErrorClass GetRetryErrorClass(EStatus status);
+ERetryErrorClass GetRetryErrorClassV2(EStatus status);
void Cancel(NGrpc::IQueueClientContextPtr& context);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
index a22ed23cea..aff01791b9 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp
@@ -170,189 +170,13 @@ TAsyncDescribeTopicResult TPersQueueClient::DescribeTopic(const TString& path, c
return Impl_->DescribeTopic(path, settings);
}
-namespace {
-
-struct TNoRetryState : IRetryState {
- TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override {
- return Nothing();
- }
-};
-
-struct TNoRetryPolicy : IRetryPolicy {
- IRetryState::TPtr CreateRetryState() const override {
- return std::make_unique<TNoRetryState>();
- }
-};
-
-TDuration RandomizeDelay(TDuration baseDelay) {
- const TDuration::TValue half = baseDelay.GetValue() / 2;
- return TDuration::FromValue(half + RandomNumber<TDuration::TValue>(half));
-}
-
-struct TExponentialBackoffState : IRetryState {
- TExponentialBackoffState(TDuration minDelay,
- TDuration minLongRetryDelay,
- TDuration maxDelay,
- size_t maxRetries,
- TDuration maxTime,
- double scaleFactor,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction)
- : MinLongRetryDelay(minLongRetryDelay)
- , MaxDelay(maxDelay)
- , MaxRetries(maxRetries)
- , MaxTime(maxTime)
- , ScaleFactor(scaleFactor)
- , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
- , CurrentDelay(minDelay)
- , AttemptsDone(0)
- , RetryErrorClassFunction(retryErrorClassFunction)
- {
- }
-
- TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override {
- const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus());
- if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) {
- return Nothing();
- }
-
- if (errorClass == IRetryPolicy::ERetryErrorClass::LongRetry) {
- CurrentDelay = Max(CurrentDelay, MinLongRetryDelay);
- }
-
- const TDuration delay = RandomizeDelay(CurrentDelay);
-
- if (CurrentDelay < MaxDelay) {
- CurrentDelay = Min(CurrentDelay * ScaleFactor, MaxDelay);
- }
-
- ++AttemptsDone;
- return delay;
- }
-
- const TDuration MinLongRetryDelay;
- const TDuration MaxDelay;
- const size_t MaxRetries;
- const TDuration MaxTime;
- const double ScaleFactor;
- const TInstant StartTime;
- TDuration CurrentDelay;
- size_t AttemptsDone;
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TExponentialBackoffPolicy : IRetryPolicy {
- TExponentialBackoffPolicy(TDuration minDelay,
- TDuration minLongRetryDelay,
- TDuration maxDelay,
- size_t maxRetries,
- TDuration maxTime,
- double scaleFactor,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
- : MinDelay(minDelay)
- , MinLongRetryDelay(minLongRetryDelay)
- , MaxDelay(maxDelay)
- , MaxRetries(maxRetries)
- , MaxTime(maxTime)
- , ScaleFactor(scaleFactor)
- , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass)
- {
- Y_ASSERT(MinDelay < MaxDelay);
- Y_ASSERT(MinLongRetryDelay < MaxDelay);
- Y_ASSERT(MinLongRetryDelay >= MinDelay);
- Y_ASSERT(ScaleFactor > 1.0);
- Y_ASSERT(MaxRetries > 0);
- Y_ASSERT(MaxTime > MinDelay);
- }
-
- IRetryState::TPtr CreateRetryState() const override {
- return std::make_unique<TExponentialBackoffState>(MinDelay, MinLongRetryDelay, MaxDelay, MaxRetries, MaxTime, ScaleFactor, RetryErrorClassFunction);
- }
-
- const TDuration MinDelay;
- const TDuration MinLongRetryDelay;
- const TDuration MaxDelay;
- const size_t MaxRetries;
- const TDuration MaxTime;
- const double ScaleFactor;
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TFixedIntervalState : IRetryState {
- TFixedIntervalState(TDuration delay,
- TDuration longRetryDelay,
- size_t maxRetries,
- TDuration maxTime,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> retryErrorClassFunction)
- : Delay(delay)
- , LongRetryDelay(longRetryDelay)
- , MaxRetries(maxRetries)
- , MaxTime(maxTime)
- , StartTime(maxTime != TDuration::Max() ? TInstant::Now() : TInstant::Zero())
- , AttemptsDone(0)
- , RetryErrorClassFunction(retryErrorClassFunction)
- {
- }
-
- TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) override {
- const IRetryPolicy::ERetryErrorClass errorClass = RetryErrorClassFunction(status.GetStatus());
- if (AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime || errorClass == IRetryPolicy::ERetryErrorClass::NoRetry) {
- return Nothing();
- }
-
- const TDuration delay = RandomizeDelay(errorClass == IRetryPolicy::ERetryErrorClass::LongRetry ? LongRetryDelay : Delay);
-
- ++AttemptsDone;
- return delay;
- }
-
- const TDuration Delay;
- const TDuration LongRetryDelay;
- const size_t MaxRetries;
- const TDuration MaxTime;
- const TInstant StartTime;
- size_t AttemptsDone;
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-struct TFixedIntervalPolicy : IRetryPolicy {
- TFixedIntervalPolicy(TDuration delay,
- TDuration longRetryDelay,
- size_t maxRetries,
- TDuration maxTime,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
- : Delay(delay)
- , LongRetryDelay(longRetryDelay)
- , MaxRetries(maxRetries)
- , MaxTime(maxTime)
- , RetryErrorClassFunction(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass)
- {
- Y_ASSERT(MaxRetries > 0);
- Y_ASSERT(MaxTime > Delay);
- Y_ASSERT(MaxTime > LongRetryDelay);
- Y_ASSERT(LongRetryDelay >= Delay);
- }
-
- IRetryState::TPtr CreateRetryState() const override {
- return std::make_unique<TFixedIntervalState>(Delay, LongRetryDelay, MaxRetries, MaxTime, RetryErrorClassFunction);
- }
-
- const TDuration Delay;
- const TDuration LongRetryDelay;
- const size_t MaxRetries;
- const TDuration MaxTime;
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> RetryErrorClassFunction;
-};
-
-} // namespace
-
IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() {
static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy();
return policy;
}
IRetryPolicy::TPtr IRetryPolicy::GetNoRetryPolicy() {
- static IRetryPolicy::TPtr policy = std::make_shared<TNoRetryPolicy>();
- return policy;
+ return ::IRetryPolicy<EStatus>::GetNoRetryPolicy();
}
IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay,
@@ -361,18 +185,18 @@ IRetryPolicy::TPtr IRetryPolicy::GetExponentialBackoffPolicy(TDuration minDelay,
size_t maxRetries,
TDuration maxTime,
double scaleFactor,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
+ std::function<ERetryErrorClass(EStatus)> customRetryClassFunction)
{
- return std::make_shared<TExponentialBackoffPolicy>(minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor, customRetryClassFunction);
+ return ::IRetryPolicy<EStatus>::GetExponentialBackoffPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, minDelay, minLongRetryDelay, maxDelay, maxRetries, maxTime, scaleFactor);
}
IRetryPolicy::TPtr IRetryPolicy::GetFixedIntervalPolicy(TDuration delay,
TDuration longRetryDelay,
size_t maxRetries,
TDuration maxTime,
- std::function<IRetryPolicy::ERetryErrorClass(EStatus)> customRetryClassFunction)
+ std::function<ERetryErrorClass(EStatus)> customRetryClassFunction)
{
- return std::make_shared<TFixedIntervalPolicy>(delay, longRetryDelay, maxRetries, maxTime, customRetryClassFunction);
+ return ::IRetryPolicy<EStatus>::GetFixedIntervalPolicy(customRetryClassFunction ? customRetryClassFunction : GetRetryErrorClass, delay, longRetryDelay, maxRetries, maxTime);
}
std::shared_ptr<IReadSession> TPersQueueClient::CreateReadSession(const TReadSessionSettings& settings) {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index e7dd0a87e5..3b82d7ec0d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -233,7 +233,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
if (!ClusterDiscoveryRetryState) {
ClusterDiscoveryRetryState = Settings.RetryPolicy_->CreateRetryState();
}
- TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status);
+ TMaybe<TDuration> retryDelay = ClusterDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
if (retryDelay) {
Log << TLOG_INFO << "Cluster discovery request failed. Status: " << status.GetStatus()
<< ". Issues: \"" << IssuesSingleLineString(status.GetIssues()) << "\"";
@@ -724,7 +724,7 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
ServerMessage = std::make_shared<Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>();
++ConnectionGeneration;
if (RetryState) {
- TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(TPlainStatus(status));
+ TMaybe<TDuration> nextDelay = RetryState->GetNextRetryDelay(status.Status);
if (nextDelay) {
delay = *nextDelay;
delayContext = ClientContext->CreateContext();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index b5b874954f..e08db09456 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -970,7 +970,7 @@ private:
size_t ConnectionGeneration = 0;
TAdaptiveLock Lock;
IProcessor::TPtr Processor;
- IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
+ IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
size_t ConnectionAttemptsDone = 0;
// Memory usage.
@@ -1103,7 +1103,7 @@ private:
std::shared_ptr<TReadSessionEventsQueue> EventsQueue;
THashMap<TString, TClusterSessionInfo> ClusterSessions; // Cluster name (in lower case) -> TClusterSessionInfo
NGrpc::IQueueClientContextPtr ClusterDiscoveryDelayContext;
- IRetryState::TPtr ClusterDiscoveryRetryState;
+ IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState;
bool DataReadingSuspended = false;
NGrpc::IQueueClientContextPtr DumpCountersContext;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index af4e08b979..0a338cfc1d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -94,7 +94,7 @@ TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& stat
if (!RetryState) {
RetryState = Settings.RetryPolicy_->CreateRetryState();
}
- nextDelay = RetryState->GetNextRetryDelay(TPlainStatus(status));
+ nextDelay = RetryState->GetNextRetryDelay(status.Status);
if (nextDelay) {
result.StartDelay = *nextDelay;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
index 324550bb02..6f506a0080 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
@@ -387,7 +387,7 @@ private:
size_t ConnectionAttemptsDone = 0;
TAdaptiveLock Lock;
IProcessor::TPtr Processor;
- IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
+ IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting).
std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to.
TString SessionId;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index ee9aa75a0a..ddb1b2d82b 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -4,6 +4,7 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/log.h>
+#include <library/cpp/retry/retry_policy.h>
#include <util/datetime/base.h>
#include <util/generic/hash.h>
@@ -885,29 +886,8 @@ TString DebugString(const TReadSessionEvent::TEvent& event);
//! - exponential backoff policy;
//! - retries with fixed interval;
//! - no retries.
-//! TODO: move to common header (not persqueue).
-
-//! Retry state of single request.
-struct IRetryState {
- using TPtr = std::unique_ptr<IRetryState>;
-
- virtual ~IRetryState() = default;
-
- //! Calculate delay before next retry if next retry is allowed.
- //! Returns empty maybe if retry is not allowed anymore.
- virtual TMaybe<TDuration> GetNextRetryDelay(const TStatus& status) = 0;
-};
-
-struct IRetryPolicy {
- using TPtr = std::shared_ptr<IRetryPolicy>;
-
- virtual ~IRetryPolicy() = default;
-
- //! Function that is called after first error
- //! to find out a futher retry behaviour.
- //! Retry state is expected to be created for the whole single retry session.
- virtual IRetryState::TPtr CreateRetryState() const = 0;
+struct IRetryPolicy : ::IRetryPolicy<EStatus> {
//!
//! Default implementations.
//!
@@ -915,17 +895,6 @@ struct IRetryPolicy {
static TPtr GetDefaultPolicy(); // Exponential backoff with infinite retry attempts.
static TPtr GetNoRetryPolicy(); // Denies all kind of retries.
- enum class ERetryErrorClass {
- // This error shouldn't be retried.
- NoRetry,
-
- // This error could be retried in short period of time.
- ShortRetry,
-
- // This error requires waiting before it could be retried.
- LongRetry,
- };
-
//! Randomized exponential backoff policy.
static TPtr GetExponentialBackoffPolicy(TDuration minDelay = TDuration::MilliSeconds(10),
// Delay for statuses that require waiting before retry (such as OVERLOADED).
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 9b2367e3f3..002dd47290 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -578,7 +578,7 @@ TReadSessionImplTestSetup::TReadSessionImplTestSetup() {
Settings
.AppendTopics({"TestTopic"})
.ConsumerName("TestConsumer")
- .RetryPolicy(IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10)))
+ .RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10)))
.Counters(MakeIntrusive<NYdb::NPersQueue::TReaderCounters>(MakeIntrusive<NMonitoring::TDynamicCounters>()));
Log.SetFormatter(GetPrefixLogFormatter(""));
@@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) {
TReadSessionSettings settings = setup.GetReadSessionSettings();
// Set policy with max retries == 3.
- settings.RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(TDuration::MilliSeconds(10), TDuration::MilliSeconds(10), TDuration::MilliSeconds(100), 3));
+ settings.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetExponentialBackoffPolicy(TDuration::MilliSeconds(10), TDuration::MilliSeconds(10), TDuration::MilliSeconds(100), 3));
std::shared_ptr<IReadSession> session = setup.GetPersQueueClient().CreateReadSession(settings);
TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(true);
UNIT_ASSERT(event);
@@ -1002,7 +1002,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
void StopsRetryAfterFailedAttemptImpl(bool timeout) {
TReadSessionImplTestSetup setup;
- setup.Settings.RetryPolicy(IRetryPolicy::GetNoRetryPolicy());
+ setup.Settings.RetryPolicy(NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
EXPECT_CALL(*setup.MockProcessorFactory, OnCreateProcessor(_))
.WillOnce([&]() {
if (timeout)
@@ -1026,26 +1026,26 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
}
Y_UNIT_TEST(UsesOnRetryStateDuringRetries) {
- class TTestRetryState : public IRetryState {
+ class TTestRetryState : public NYdb::NPersQueue::IRetryPolicy::IRetryState {
TDuration Delay;
- TMaybe<TDuration> GetNextRetryDelay(const NYdb::TStatus&) override {
+ TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override {
Delay += TDuration::Seconds(1);
return Delay;
}
};
- class TTestRetryPolicy : public IRetryPolicy {
+ class TTestRetryPolicy : public NYdb::NPersQueue::IRetryPolicy {
IRetryState::TPtr CreateRetryState() const override {
return IRetryState::TPtr(new TTestRetryState());
}
};
- std::shared_ptr<IRetryState> state(new TTestRetryState());
+ std::shared_ptr<NYdb::NPersQueue::IRetryPolicy::IRetryState> state(new TTestRetryState());
TReadSessionImplTestSetup setup;
ON_CALL(*setup.MockProcessorFactory, ValidateConnectTimeout(_))
.WillByDefault([state](TDuration timeout) mutable {
- UNIT_ASSERT_VALUES_EQUAL(timeout, *state->GetNextRetryDelay(NYdb::TStatus(TPlainStatus())));
+ UNIT_ASSERT_VALUES_EQUAL(timeout, *state->GetNextRetryDelay(NYdb::EStatus::INTERNAL_ERROR));
});
auto failCreation = [&]() {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h
index 7fb219757b..69d62dfa48 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils.h
@@ -163,7 +163,7 @@ public:
}
};
-struct TYdbPqTestRetryState : IRetryState {
+struct TYdbPqTestRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState {
TYdbPqTestRetryState(
std::function<void ()> retryCallback, std::function<void ()> destroyCallback, const TDuration& delay
)
@@ -172,7 +172,7 @@ struct TYdbPqTestRetryState : IRetryState {
, Delay(delay)
{}
- TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override {
+ TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override {
Cerr << "Test retry state: get retry delay\n";
RetryDone();
return Delay;
@@ -185,9 +185,9 @@ struct TYdbPqTestRetryState : IRetryState {
DestroyDone();
}
};
-struct TYdbPqNoRetryState : IRetryState {
+struct TYdbPqNoRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState {
TAtomic DelayCalled = 0;
- TMaybe<TDuration> GetNextRetryDelay(const TStatus&) override {
+ TMaybe<TDuration> GetNextRetryDelay(NYdb::EStatus) override {
auto res = AtomicSwap(&DelayCalled, 0);
UNIT_ASSERT(!res);
return Nothing();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
index 3cf6e7f8b8..66385d41a8 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/data_plane_helpers.cpp
@@ -25,7 +25,7 @@ namespace NKikimr::NPersQueueTests {
) {
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
- settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? IRetryPolicy::GetDefaultPolicy() : IRetryPolicy::GetNoRetryPolicy());
+ settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
if (codec) {
if (*codec == "raw")
settings.Codec(ECodec::RAW);
@@ -55,7 +55,7 @@ namespace NKikimr::NPersQueueTests {
Y_UNUSED(codec);
auto settings = TWriteSessionSettings().Path(topic).MessageGroupId(sourceId);
if (partitionGroup) settings.PartitionGroupId(*partitionGroup);
- settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? IRetryPolicy::GetDefaultPolicy() : IRetryPolicy::GetNoRetryPolicy());
+ settings.RetryPolicy((reconnectOnFailure && *reconnectOnFailure) ? NYdb::NPersQueue::IRetryPolicy::GetDefaultPolicy() : NYdb::NPersQueue::IRetryPolicy::GetNoRetryPolicy());
return CreateSimpleWriter(driver, settings);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
index d19d4f6b2e..3314e9284c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
@@ -13,6 +13,7 @@ SRCS(
)
PEERDIR(
+ library/cpp/retry
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl
ydb/public/sdk/cpp/client/ydb_proto
)