diff options
| author | dvrazumov <[email protected]> | 2023-09-09 18:58:59 +0300 |
|---|---|---|
| committer | dvrazumov <[email protected]> | 2023-09-09 19:19:02 +0300 |
| commit | 525aa97d3b20b4fe8d1b56282eadf8a5c75becc5 (patch) | |
| tree | bbd3a95e0c99a101cfa112cc84ec9e6821d37b65 | |
| parent | 7082400c058c777817caaadd12743bfe0323cdef (diff) | |
KIKIMR-19051: add duration to retry context
| -rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h | 29 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h | 65 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h | 51 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/client.cpp | 18 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/client.h | 3 | ||||
| -rw-r--r-- | ydb/public/sdk/cpp/client/ydb_retry/retry.h | 1 | ||||
| -rw-r--r-- | ydb/services/ydb/ydb_table_ut.cpp | 46 |
7 files changed, 152 insertions, 61 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h index 937884b6735..e1c13da2d4a 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h @@ -6,6 +6,7 @@ #include <library/cpp/threading/future/core/fwd.h> #include <util/datetime/base.h> +#include <util/datetime/cputimer.h> #include <util/generic/maybe.h> #include <util/generic/ptr.h> #include <util/system/types.h> @@ -33,22 +34,23 @@ enum class NextStep { class TRetryContextBase : TNonCopyable { protected: - TRetryOperationSettings Settings; - ui32 RetryNumber; + TRetryOperationSettings Settings_; + ui32 RetryNumber_; + TSimpleTimer RetryTimer_; protected: TRetryContextBase(const TRetryOperationSettings& settings) - : Settings(settings) - , RetryNumber(0) + : Settings_(settings) + , RetryNumber_(0) {} virtual void Reset() {} void LogRetry(const TStatus& status) { - if (Settings.Verbose_) { + if (Settings_.Verbose_) { Cerr << "Previous query attempt was finished with unsuccessful status " << status.GetStatus() << ": " << status.GetIssues().ToString(true) << Endl; - Cerr << "Sending retry attempt " << RetryNumber << " of " << Settings.MaxRetries_ << Endl; + Cerr << "Sending retry attempt " << RetryNumber_ << " of " << Settings_.MaxRetries_ << Endl; } } @@ -56,7 +58,10 @@ protected: if (status.IsSuccess()) { return NextStep::Finish; } - if (RetryNumber >= Settings.MaxRetries_) { + if (RetryNumber_ >= Settings_.MaxRetries_) { + return NextStep::Finish; + } + if (RetryTimer_.Get() >= Settings_.MaxTimeout_) { return NextStep::Finish; } switch (status.GetStatus()) { @@ -76,21 +81,21 @@ protected: return NextStep::RetryImmediately; case EStatus::NOT_FOUND: - if (Settings.RetryNotFound_) { + if (Settings_.RetryNotFound_) { return NextStep::RetryImmediately; } else { return NextStep::Finish; } case EStatus::UNDETERMINED: - if (Settings.Idempotent_) { + if (Settings_.Idempotent_) { return NextStep::RetryFastBackoff; } else { return NextStep::Finish; } case EStatus::TRANSPORT_UNAVAILABLE: - if (Settings.Idempotent_) { + if (Settings_.Idempotent_) { Reset(); return NextStep::RetryFastBackoff; } else { @@ -101,6 +106,10 @@ protected: return NextStep::Finish; } } + + TDuration GetRemainingTimeout() { + return Settings_.MaxTimeout_ - RetryTimer_.Get(); + } }; } // namespace NYdb::NRetry diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h index e07e6cdd7a2..931d3382c52 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h @@ -13,20 +13,21 @@ public: using TPtr = TIntrusivePtr<Async::TRetryContext<TClient, TAsyncStatusType>>; protected: - TClient Client; - NThreading::TPromise<TStatusType> Promise; + TClient Client_; + NThreading::TPromise<TStatusType> Promise_; public: TAsyncStatusType Execute() { + this->RetryTimer_.Reset(); this->Retry(); - return this->Promise.GetFuture(); + return this->Promise_.GetFuture(); } protected: explicit TRetryContext(const TClient& client, const TRetryOperationSettings& settings) : TRetryContextBase(settings) - , Client(client) - , Promise(NThreading::NewPromise<TStatusType>()) + , Client_(client) + , Promise_(NThreading::NewPromise<TStatusType>()) {} virtual void Retry() = 0; @@ -38,21 +39,21 @@ protected: } static void DoBackoff(TPtr self, bool fast) { - auto backoffSettings = fast ? self->Settings.FastBackoffSettings_ - : self->Settings.SlowBackoffSettings_; - AsyncBackoff(self->Client.Impl_, backoffSettings, self->RetryNumber, + auto backoffSettings = fast ? self->Settings_.FastBackoffSettings_ + : self->Settings_.SlowBackoffSettings_; + AsyncBackoff(self->Client_.Impl_, backoffSettings, self->RetryNumber_, [self]() {DoRetry(self);}); } static void HandleExceptionAsync(TPtr self, std::exception_ptr e) { - self->Promise.SetException(e); + self->Promise_.SetException(e); } static void HandleStatusAsync(TPtr self, const TStatusType& status) { auto nextStep = self->GetNextStep(status); if (nextStep != NextStep::Finish) { - self->RetryNumber++; - self->Client.Impl_->CollectRetryStatAsync(status.GetStatus()); + self->RetryNumber_++; + self->Client_.Impl_->CollectRetryStatAsync(status.GetStatus()); self->LogRetry(status); } switch (nextStep) { @@ -63,7 +64,7 @@ protected: case NextStep::RetrySlowBackoff: return DoBackoff(self, false); case NextStep::Finish: - return self->Promise.SetValue(status); + return self->Promise_.SetValue(status); } } @@ -86,13 +87,13 @@ class TRetryWithoutSession : public TRetryContext<TClient, TAsyncStatusType> { using TPtr = typename TRetryContext::TPtr; private: - TOperation Operation; + TOperation Operation_; public: - explicit TRetryWithoutSession(const TClient& client, TOperation&& operation, - const TRetryOperationSettings& settings) + explicit TRetryWithoutSession( + const TClient& client, TOperation&& operation, const TRetryOperationSettings& settings) : TRetryContext(client, settings) - , Operation(operation) + , Operation_(operation) {} void Retry() override { @@ -102,7 +103,11 @@ public: protected: TAsyncStatusType RunOperation() override { - return Operation(this->Client); + if constexpr (TFunctionArgs<TOperation>::Length == 1) { + return Operation_(this->Client_); + } else { + return Operation_(this->Client_, this->GetRemainingTimeout()); + } } }; @@ -116,21 +121,21 @@ class TRetryWithSession : public TRetryContext<TClient, TAsyncStatusType> { using TAsyncCreateSessionResult = typename TClient::TAsyncCreateSessionResult; private: - TOperation Operation; - TMaybe<TSession> Session; + TOperation Operation_; + TMaybe<TSession> Session_; public: - explicit TRetryWithSession(const TClient& client, TOperation&& operation, - const TRetryOperationSettings& settings) + explicit TRetryWithSession( + const TClient& client, TOperation&& operation, const TRetryOperationSettings& settings) : TRetryContextAsync(client, settings) - , Operation(operation) + , Operation_(operation) {} void Retry() override { TPtr self(this); - if (!Session) { - auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_); - this->Client.GetSession(settings).Subscribe( + if (!Session_) { + auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_); + this->Client_.GetSession(settings).Subscribe( [self](const TAsyncCreateSessionResult& resultFuture) { try { auto& result = resultFuture.GetValue(); @@ -139,7 +144,7 @@ public: } auto* myself = dynamic_cast<TRetryWithSession*>(self.Get()); - myself->Session = result.GetSession(); + myself->Session_ = result.GetSession(); myself->DoRunOperation(self); } catch (...) { return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception()); @@ -153,11 +158,15 @@ public: private: void Reset() override { - Session.Clear(); + Session_.Clear(); } TAsyncStatusType RunOperation() override { - return Operation(this->Session.GetRef()); + if constexpr (TFunctionArgs<TOperation>::Length == 1) { + return Operation_(this->Session_.GetRef()); + } else { + return Operation_(this->Session_.GetRef(), this->GetRemainingTimeout()); + } } }; diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h index f62e4a468a6..27a0a3c902c 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h @@ -11,12 +11,13 @@ namespace NYdb::NRetry::Sync { template <typename TClient, typename TStatusType> class TRetryContext : public TRetryContextBase { protected: - TClient Client; + TClient& Client_; public: TStatusType Execute() { + this->RetryTimer_.Reset(); TStatusType status = Retry(); // first attempt - for (this->RetryNumber = 0; this->RetryNumber <= this->Settings.MaxRetries_;) { + for (this->RetryNumber_ = 0; this->RetryNumber_ <= this->Settings_.MaxRetries_;) { auto nextStep = this->GetNextStep(status); switch (nextStep) { case NextStep::RetryImmediately: @@ -31,9 +32,9 @@ public: return status; } // make next retry - this->RetryNumber++; + this->RetryNumber_++; this->LogRetry(status); - this->Client.Impl_->CollectRetryStatSync(status.GetStatus()); + this->Client_.Impl_->CollectRetryStatSync(status.GetStatus()); status = Retry(); } return status; @@ -42,7 +43,7 @@ public: protected: TRetryContext(TClient& client, const TRetryOperationSettings& settings) : TRetryContextBase(settings) - , Client(client) + , Client_(client) {} virtual TStatusType Retry() = 0; @@ -50,21 +51,21 @@ protected: virtual TStatusType RunOperation() = 0; void DoBackoff(bool fast) { - const auto &settings = fast ? this->Settings.FastBackoffSettings_ - : this->Settings.SlowBackoffSettings_; - Backoff(settings, this->RetryNumber); + const auto &settings = fast ? this->Settings_.FastBackoffSettings_ + : this->Settings_.SlowBackoffSettings_; + Backoff(settings, this->RetryNumber_); } }; template<typename TClient, typename TOperation, typename TStatusType = TFunctionResult<TOperation>> class TRetryWithoutSession : public TRetryContext<TClient, TStatusType> { private: - const TOperation& Operation; + const TOperation& Operation_; public: TRetryWithoutSession(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings) : TRetryContext<TClient, TStatusType>(client, settings) - , Operation(operation) + , Operation_(operation) {} protected: @@ -73,7 +74,11 @@ protected: } TStatusType RunOperation() override { - return Operation(this->Client); + if constexpr (TFunctionArgs<TOperation>::Length == 1) { + return Operation_(this->Client_); + } else { + return Operation_(this->Client_, this->GetRemainingTimeout()); + } } }; @@ -83,29 +88,29 @@ class TRetryWithSession : public TRetryContext<TClient, TStatusType> { using TCreateSessionSettings = typename TClient::TCreateSessionSettings; private: - const TOperation& Operation; - TMaybe<TSession> Session; + const TOperation& Operation_; + TMaybe<TSession> Session_; public: TRetryWithSession(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings) : TRetryContext<TClient, TStatusType>(client, settings) - , Operation(operation) + , Operation_(operation) {} protected: TStatusType Retry() override { TMaybe<TStatusType> status; - if (!Session) { - auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_); - auto sessionResult = this->Client.GetSession(settings).GetValueSync(); + if (!Session_) { + auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_); + auto sessionResult = this->Client_.GetSession(settings).GetValueSync(); if (sessionResult.IsSuccess()) { - Session = sessionResult.GetSession(); + Session_ = sessionResult.GetSession(); } status = TStatusType(TStatus(sessionResult)); } - if (Session) { + if (Session_) { status = RunOperation(); } @@ -113,11 +118,15 @@ protected: } TStatusType RunOperation() override { - return Operation(this->Session.GetRef()); + if constexpr (TFunctionArgs<TOperation>::Length == 1) { + return Operation_(this->Session_.GetRef()); + } else { + return Operation_(this->Session_.GetRef(), this->GetRemainingTimeout()); + } } void Reset() override { - Session.Clear(); + Session_.Clear(); } }; diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index b0987066e7e..41a1f0858c3 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -20,6 +20,13 @@ namespace NYdb::NQuery { using TRetryContextAsync = NRetry::Async::TRetryContext<TQueryClient, TAsyncExecuteQueryResult>; +NYdb::NRetry::TRetryOperationSettings GetRetrySettings(TDuration timeout, bool isIndempotent) { + return NYdb::NRetry::TRetryOperationSettings() + .Idempotent(isIndempotent) + .GetSessionClientTimeout(timeout) + .MaxTimeout(timeout); +} + TCreateSessionSettings::TCreateSessionSettings() { ClientTimeout_ = TDuration::Seconds(5); }; @@ -426,6 +433,17 @@ TAsyncExecuteQueryResult TQueryClient::RetryQuery(TQueryFunc&& queryFunc, TRetry return ctx->Execute(); } +TAsyncExecuteQueryResult TQueryClient::RetryQuery(const TString& query, const TTxControl& txControl, + TDuration timeout, bool isIndempotent) +{ + auto settings = GetRetrySettings(timeout, isIndempotent); + auto queryFunc = [&query, &txControl](TSession session, TDuration duration) -> TAsyncExecuteQueryResult { + return session.ExecuteQuery(query, txControl, TExecuteQuerySettings().ClientTimeout(duration)); + }; + TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings)); + return ctx->Execute(); +} + //////////////////////////////////////////////////////////////////////////////// TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session) diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h index 31069a26c03..7499f998461 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/ydb_query/client.h @@ -84,6 +84,9 @@ public: TAsyncExecuteQueryResult RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); + TAsyncExecuteQueryResult RetryQuery(const TString& query, const TTxControl& txControl, + TDuration timeout, bool isIndempotent); + NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings = TExecuteScriptSettings()); diff --git a/ydb/public/sdk/cpp/client/ydb_retry/retry.h b/ydb/public/sdk/cpp/client/ydb_retry/retry.h index d879f00e123..1143d137e5e 100644 --- a/ydb/public/sdk/cpp/client/ydb_retry/retry.h +++ b/ydb/public/sdk/cpp/client/ydb_retry/retry.h @@ -19,6 +19,7 @@ struct TRetryOperationSettings { FLUENT_SETTING_DEFAULT(ui32, MaxRetries, 10); FLUENT_SETTING_DEFAULT(bool, RetryNotFound, true); FLUENT_SETTING_DEFAULT(TDuration, GetSessionClientTimeout, TDuration::Seconds(5)); + FLUENT_SETTING_DEFAULT(TDuration, MaxTimeout, TDuration::Max()); FLUENT_SETTING_DEFAULT(TBackoffSettings, FastBackoffSettings, DefaultFastBackoffSettings()); FLUENT_SETTING_DEFAULT(TBackoffSettings, SlowBackoffSettings, DefaultSlowBackoffSettings()); FLUENT_SETTING_FLAG(Idempotent); diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 3b4bf878f19..97d2b03f33e 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -1993,11 +1993,11 @@ R"___(<main>: Error: Transaction not found: , code: 2015 void CheckRetryResult(const TStatus& status, const TVector<TResultSet>& resultSets, bool expectSuccess) { if (expectSuccess) { - UNIT_ASSERT(status.IsSuccess()); + UNIT_ASSERT_C(status.IsSuccess(), status); UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1); UNIT_ASSERT_VALUES_EQUAL(resultSets[0].ColumnsCount(), 3); } else { - UNIT_ASSERT(!status.IsSuccess()); + UNIT_ASSERT_C(!status.IsSuccess(), status); } } @@ -2104,6 +2104,48 @@ R"___(<main>: Error: Transaction not found: , code: 2015 driver.Stop(true); } + Y_UNIT_TEST(RetryOperationLimitedDuration) { + TKikimrWithGrpcAndRootSchema server; + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + + const size_t MaxRetries = 3; // OVERLOADED error has slow backoff policy, 2 retries takes about 2 sec + const double MaxDurationSec = 0.5; // Less than 2 sec, should be enough to fail on retry with duration limit + const auto retrySettings = TRetryOperationSettings().MaxRetries(MaxRetries).Verbose(true); + const auto retrySettingsLimited = TRetryOperationSettings(retrySettings).MaxTimeout(TDuration::Seconds(MaxDurationSec)); + size_t retryNumber = 0; + + // Asynchronous version + auto operation = [&retryNumber] (TSession /*session*/) -> TAsyncStatus { + if (retryNumber++ < MaxRetries) { + return NThreading::MakeFuture(TStatus(EStatus::OVERLOADED, {})); + } + return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {})); + }; + + retryNumber = 0; + UNIT_ASSERT(client.RetryOperation(operation, retrySettings).GetValueSync().IsSuccess()); + + retryNumber = 0; + UNIT_ASSERT(!client.RetryOperation(operation, retrySettingsLimited).GetValueSync().IsSuccess()); + + // Synchronous version + auto operationSync = [&retryNumber] (TSession /*session*/) -> TStatus { + if (retryNumber++ < MaxRetries) { + return TStatus(EStatus::OVERLOADED, {}); + } + return TStatus(EStatus::SUCCESS, {}); + }; + + retryNumber = 0; + UNIT_ASSERT(client.RetryOperationSync(operationSync, retrySettings).IsSuccess()); + + retryNumber = 0; + UNIT_ASSERT(!client.RetryOperationSync(operationSync, retrySettingsLimited).IsSuccess()); + + driver.Stop(true); + } + Y_UNIT_TEST(QueryLimits) { NKikimrConfig::TAppConfig appConfig; auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); |
