summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordvrazumov <[email protected]>2023-09-09 18:58:59 +0300
committerdvrazumov <[email protected]>2023-09-09 19:19:02 +0300
commit525aa97d3b20b4fe8d1b56282eadf8a5c75becc5 (patch)
treebbd3a95e0c99a101cfa112cc84ec9e6821d37b65
parent7082400c058c777817caaadd12743bfe0323cdef (diff)
KIKIMR-19051: add duration to retry context
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h29
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h65
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h51
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp18
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_retry/retry.h1
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp46
7 files changed, 152 insertions, 61 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
index 937884b6735..e1c13da2d4a 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
@@ -6,6 +6,7 @@
#include <library/cpp/threading/future/core/fwd.h>
#include <util/datetime/base.h>
+#include <util/datetime/cputimer.h>
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
#include <util/system/types.h>
@@ -33,22 +34,23 @@ enum class NextStep {
class TRetryContextBase : TNonCopyable {
protected:
- TRetryOperationSettings Settings;
- ui32 RetryNumber;
+ TRetryOperationSettings Settings_;
+ ui32 RetryNumber_;
+ TSimpleTimer RetryTimer_;
protected:
TRetryContextBase(const TRetryOperationSettings& settings)
- : Settings(settings)
- , RetryNumber(0)
+ : Settings_(settings)
+ , RetryNumber_(0)
{}
virtual void Reset() {}
void LogRetry(const TStatus& status) {
- if (Settings.Verbose_) {
+ if (Settings_.Verbose_) {
Cerr << "Previous query attempt was finished with unsuccessful status "
<< status.GetStatus() << ": " << status.GetIssues().ToString(true) << Endl;
- Cerr << "Sending retry attempt " << RetryNumber << " of " << Settings.MaxRetries_ << Endl;
+ Cerr << "Sending retry attempt " << RetryNumber_ << " of " << Settings_.MaxRetries_ << Endl;
}
}
@@ -56,7 +58,10 @@ protected:
if (status.IsSuccess()) {
return NextStep::Finish;
}
- if (RetryNumber >= Settings.MaxRetries_) {
+ if (RetryNumber_ >= Settings_.MaxRetries_) {
+ return NextStep::Finish;
+ }
+ if (RetryTimer_.Get() >= Settings_.MaxTimeout_) {
return NextStep::Finish;
}
switch (status.GetStatus()) {
@@ -76,21 +81,21 @@ protected:
return NextStep::RetryImmediately;
case EStatus::NOT_FOUND:
- if (Settings.RetryNotFound_) {
+ if (Settings_.RetryNotFound_) {
return NextStep::RetryImmediately;
} else {
return NextStep::Finish;
}
case EStatus::UNDETERMINED:
- if (Settings.Idempotent_) {
+ if (Settings_.Idempotent_) {
return NextStep::RetryFastBackoff;
} else {
return NextStep::Finish;
}
case EStatus::TRANSPORT_UNAVAILABLE:
- if (Settings.Idempotent_) {
+ if (Settings_.Idempotent_) {
Reset();
return NextStep::RetryFastBackoff;
} else {
@@ -101,6 +106,10 @@ protected:
return NextStep::Finish;
}
}
+
+ TDuration GetRemainingTimeout() {
+ return Settings_.MaxTimeout_ - RetryTimer_.Get();
+ }
};
} // namespace NYdb::NRetry
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h
index e07e6cdd7a2..931d3382c52 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h
@@ -13,20 +13,21 @@ public:
using TPtr = TIntrusivePtr<Async::TRetryContext<TClient, TAsyncStatusType>>;
protected:
- TClient Client;
- NThreading::TPromise<TStatusType> Promise;
+ TClient Client_;
+ NThreading::TPromise<TStatusType> Promise_;
public:
TAsyncStatusType Execute() {
+ this->RetryTimer_.Reset();
this->Retry();
- return this->Promise.GetFuture();
+ return this->Promise_.GetFuture();
}
protected:
explicit TRetryContext(const TClient& client, const TRetryOperationSettings& settings)
: TRetryContextBase(settings)
- , Client(client)
- , Promise(NThreading::NewPromise<TStatusType>())
+ , Client_(client)
+ , Promise_(NThreading::NewPromise<TStatusType>())
{}
virtual void Retry() = 0;
@@ -38,21 +39,21 @@ protected:
}
static void DoBackoff(TPtr self, bool fast) {
- auto backoffSettings = fast ? self->Settings.FastBackoffSettings_
- : self->Settings.SlowBackoffSettings_;
- AsyncBackoff(self->Client.Impl_, backoffSettings, self->RetryNumber,
+ auto backoffSettings = fast ? self->Settings_.FastBackoffSettings_
+ : self->Settings_.SlowBackoffSettings_;
+ AsyncBackoff(self->Client_.Impl_, backoffSettings, self->RetryNumber_,
[self]() {DoRetry(self);});
}
static void HandleExceptionAsync(TPtr self, std::exception_ptr e) {
- self->Promise.SetException(e);
+ self->Promise_.SetException(e);
}
static void HandleStatusAsync(TPtr self, const TStatusType& status) {
auto nextStep = self->GetNextStep(status);
if (nextStep != NextStep::Finish) {
- self->RetryNumber++;
- self->Client.Impl_->CollectRetryStatAsync(status.GetStatus());
+ self->RetryNumber_++;
+ self->Client_.Impl_->CollectRetryStatAsync(status.GetStatus());
self->LogRetry(status);
}
switch (nextStep) {
@@ -63,7 +64,7 @@ protected:
case NextStep::RetrySlowBackoff:
return DoBackoff(self, false);
case NextStep::Finish:
- return self->Promise.SetValue(status);
+ return self->Promise_.SetValue(status);
}
}
@@ -86,13 +87,13 @@ class TRetryWithoutSession : public TRetryContext<TClient, TAsyncStatusType> {
using TPtr = typename TRetryContext::TPtr;
private:
- TOperation Operation;
+ TOperation Operation_;
public:
- explicit TRetryWithoutSession(const TClient& client, TOperation&& operation,
- const TRetryOperationSettings& settings)
+ explicit TRetryWithoutSession(
+ const TClient& client, TOperation&& operation, const TRetryOperationSettings& settings)
: TRetryContext(client, settings)
- , Operation(operation)
+ , Operation_(operation)
{}
void Retry() override {
@@ -102,7 +103,11 @@ public:
protected:
TAsyncStatusType RunOperation() override {
- return Operation(this->Client);
+ if constexpr (TFunctionArgs<TOperation>::Length == 1) {
+ return Operation_(this->Client_);
+ } else {
+ return Operation_(this->Client_, this->GetRemainingTimeout());
+ }
}
};
@@ -116,21 +121,21 @@ class TRetryWithSession : public TRetryContext<TClient, TAsyncStatusType> {
using TAsyncCreateSessionResult = typename TClient::TAsyncCreateSessionResult;
private:
- TOperation Operation;
- TMaybe<TSession> Session;
+ TOperation Operation_;
+ TMaybe<TSession> Session_;
public:
- explicit TRetryWithSession(const TClient& client, TOperation&& operation,
- const TRetryOperationSettings& settings)
+ explicit TRetryWithSession(
+ const TClient& client, TOperation&& operation, const TRetryOperationSettings& settings)
: TRetryContextAsync(client, settings)
- , Operation(operation)
+ , Operation_(operation)
{}
void Retry() override {
TPtr self(this);
- if (!Session) {
- auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_);
- this->Client.GetSession(settings).Subscribe(
+ if (!Session_) {
+ auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_);
+ this->Client_.GetSession(settings).Subscribe(
[self](const TAsyncCreateSessionResult& resultFuture) {
try {
auto& result = resultFuture.GetValue();
@@ -139,7 +144,7 @@ public:
}
auto* myself = dynamic_cast<TRetryWithSession*>(self.Get());
- myself->Session = result.GetSession();
+ myself->Session_ = result.GetSession();
myself->DoRunOperation(self);
} catch (...) {
return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception());
@@ -153,11 +158,15 @@ public:
private:
void Reset() override {
- Session.Clear();
+ Session_.Clear();
}
TAsyncStatusType RunOperation() override {
- return Operation(this->Session.GetRef());
+ if constexpr (TFunctionArgs<TOperation>::Length == 1) {
+ return Operation_(this->Session_.GetRef());
+ } else {
+ return Operation_(this->Session_.GetRef(), this->GetRemainingTimeout());
+ }
}
};
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h
index f62e4a468a6..27a0a3c902c 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h
@@ -11,12 +11,13 @@ namespace NYdb::NRetry::Sync {
template <typename TClient, typename TStatusType>
class TRetryContext : public TRetryContextBase {
protected:
- TClient Client;
+ TClient& Client_;
public:
TStatusType Execute() {
+ this->RetryTimer_.Reset();
TStatusType status = Retry(); // first attempt
- for (this->RetryNumber = 0; this->RetryNumber <= this->Settings.MaxRetries_;) {
+ for (this->RetryNumber_ = 0; this->RetryNumber_ <= this->Settings_.MaxRetries_;) {
auto nextStep = this->GetNextStep(status);
switch (nextStep) {
case NextStep::RetryImmediately:
@@ -31,9 +32,9 @@ public:
return status;
}
// make next retry
- this->RetryNumber++;
+ this->RetryNumber_++;
this->LogRetry(status);
- this->Client.Impl_->CollectRetryStatSync(status.GetStatus());
+ this->Client_.Impl_->CollectRetryStatSync(status.GetStatus());
status = Retry();
}
return status;
@@ -42,7 +43,7 @@ public:
protected:
TRetryContext(TClient& client, const TRetryOperationSettings& settings)
: TRetryContextBase(settings)
- , Client(client)
+ , Client_(client)
{}
virtual TStatusType Retry() = 0;
@@ -50,21 +51,21 @@ protected:
virtual TStatusType RunOperation() = 0;
void DoBackoff(bool fast) {
- const auto &settings = fast ? this->Settings.FastBackoffSettings_
- : this->Settings.SlowBackoffSettings_;
- Backoff(settings, this->RetryNumber);
+ const auto &settings = fast ? this->Settings_.FastBackoffSettings_
+ : this->Settings_.SlowBackoffSettings_;
+ Backoff(settings, this->RetryNumber_);
}
};
template<typename TClient, typename TOperation, typename TStatusType = TFunctionResult<TOperation>>
class TRetryWithoutSession : public TRetryContext<TClient, TStatusType> {
private:
- const TOperation& Operation;
+ const TOperation& Operation_;
public:
TRetryWithoutSession(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings)
: TRetryContext<TClient, TStatusType>(client, settings)
- , Operation(operation)
+ , Operation_(operation)
{}
protected:
@@ -73,7 +74,11 @@ protected:
}
TStatusType RunOperation() override {
- return Operation(this->Client);
+ if constexpr (TFunctionArgs<TOperation>::Length == 1) {
+ return Operation_(this->Client_);
+ } else {
+ return Operation_(this->Client_, this->GetRemainingTimeout());
+ }
}
};
@@ -83,29 +88,29 @@ class TRetryWithSession : public TRetryContext<TClient, TStatusType> {
using TCreateSessionSettings = typename TClient::TCreateSessionSettings;
private:
- const TOperation& Operation;
- TMaybe<TSession> Session;
+ const TOperation& Operation_;
+ TMaybe<TSession> Session_;
public:
TRetryWithSession(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings)
: TRetryContext<TClient, TStatusType>(client, settings)
- , Operation(operation)
+ , Operation_(operation)
{}
protected:
TStatusType Retry() override {
TMaybe<TStatusType> status;
- if (!Session) {
- auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_);
- auto sessionResult = this->Client.GetSession(settings).GetValueSync();
+ if (!Session_) {
+ auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_);
+ auto sessionResult = this->Client_.GetSession(settings).GetValueSync();
if (sessionResult.IsSuccess()) {
- Session = sessionResult.GetSession();
+ Session_ = sessionResult.GetSession();
}
status = TStatusType(TStatus(sessionResult));
}
- if (Session) {
+ if (Session_) {
status = RunOperation();
}
@@ -113,11 +118,15 @@ protected:
}
TStatusType RunOperation() override {
- return Operation(this->Session.GetRef());
+ if constexpr (TFunctionArgs<TOperation>::Length == 1) {
+ return Operation_(this->Session_.GetRef());
+ } else {
+ return Operation_(this->Session_.GetRef(), this->GetRemainingTimeout());
+ }
}
void Reset() override {
- Session.Clear();
+ Session_.Clear();
}
};
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index b0987066e7e..41a1f0858c3 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -20,6 +20,13 @@ namespace NYdb::NQuery {
using TRetryContextAsync = NRetry::Async::TRetryContext<TQueryClient, TAsyncExecuteQueryResult>;
+NYdb::NRetry::TRetryOperationSettings GetRetrySettings(TDuration timeout, bool isIndempotent) {
+ return NYdb::NRetry::TRetryOperationSettings()
+ .Idempotent(isIndempotent)
+ .GetSessionClientTimeout(timeout)
+ .MaxTimeout(timeout);
+}
+
TCreateSessionSettings::TCreateSessionSettings() {
ClientTimeout_ = TDuration::Seconds(5);
};
@@ -426,6 +433,17 @@ TAsyncExecuteQueryResult TQueryClient::RetryQuery(TQueryFunc&& queryFunc, TRetry
return ctx->Execute();
}
+TAsyncExecuteQueryResult TQueryClient::RetryQuery(const TString& query, const TTxControl& txControl,
+ TDuration timeout, bool isIndempotent)
+{
+ auto settings = GetRetrySettings(timeout, isIndempotent);
+ auto queryFunc = [&query, &txControl](TSession session, TDuration duration) -> TAsyncExecuteQueryResult {
+ return session.ExecuteQuery(query, txControl, TExecuteQuerySettings().ClientTimeout(duration));
+ };
+ TRetryContextAsync::TPtr ctx(new NRetry::Async::TRetryWithSession(*this, std::move(queryFunc), settings));
+ return ctx->Execute();
+}
+
////////////////////////////////////////////////////////////////////////////////
TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h
index 31069a26c03..7499f998461 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.h
@@ -84,6 +84,9 @@ public:
TAsyncExecuteQueryResult RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());
+ TAsyncExecuteQueryResult RetryQuery(const TString& query, const TTxControl& txControl,
+ TDuration timeout, bool isIndempotent);
+
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script,
const TExecuteScriptSettings& settings = TExecuteScriptSettings());
diff --git a/ydb/public/sdk/cpp/client/ydb_retry/retry.h b/ydb/public/sdk/cpp/client/ydb_retry/retry.h
index d879f00e123..1143d137e5e 100644
--- a/ydb/public/sdk/cpp/client/ydb_retry/retry.h
+++ b/ydb/public/sdk/cpp/client/ydb_retry/retry.h
@@ -19,6 +19,7 @@ struct TRetryOperationSettings {
FLUENT_SETTING_DEFAULT(ui32, MaxRetries, 10);
FLUENT_SETTING_DEFAULT(bool, RetryNotFound, true);
FLUENT_SETTING_DEFAULT(TDuration, GetSessionClientTimeout, TDuration::Seconds(5));
+ FLUENT_SETTING_DEFAULT(TDuration, MaxTimeout, TDuration::Max());
FLUENT_SETTING_DEFAULT(TBackoffSettings, FastBackoffSettings, DefaultFastBackoffSettings());
FLUENT_SETTING_DEFAULT(TBackoffSettings, SlowBackoffSettings, DefaultSlowBackoffSettings());
FLUENT_SETTING_FLAG(Idempotent);
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index 3b4bf878f19..97d2b03f33e 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -1993,11 +1993,11 @@ R"___(<main>: Error: Transaction not found: , code: 2015
void CheckRetryResult(const TStatus& status, const TVector<TResultSet>& resultSets, bool expectSuccess)
{
if (expectSuccess) {
- UNIT_ASSERT(status.IsSuccess());
+ UNIT_ASSERT_C(status.IsSuccess(), status);
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(resultSets[0].ColumnsCount(), 3);
} else {
- UNIT_ASSERT(!status.IsSuccess());
+ UNIT_ASSERT_C(!status.IsSuccess(), status);
}
}
@@ -2104,6 +2104,48 @@ R"___(<main>: Error: Transaction not found: , code: 2015
driver.Stop(true);
}
+ Y_UNIT_TEST(RetryOperationLimitedDuration) {
+ TKikimrWithGrpcAndRootSchema server;
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+
+ const size_t MaxRetries = 3; // OVERLOADED error has slow backoff policy, 2 retries takes about 2 sec
+ const double MaxDurationSec = 0.5; // Less than 2 sec, should be enough to fail on retry with duration limit
+ const auto retrySettings = TRetryOperationSettings().MaxRetries(MaxRetries).Verbose(true);
+ const auto retrySettingsLimited = TRetryOperationSettings(retrySettings).MaxTimeout(TDuration::Seconds(MaxDurationSec));
+ size_t retryNumber = 0;
+
+ // Asynchronous version
+ auto operation = [&retryNumber] (TSession /*session*/) -> TAsyncStatus {
+ if (retryNumber++ < MaxRetries) {
+ return NThreading::MakeFuture(TStatus(EStatus::OVERLOADED, {}));
+ }
+ return NThreading::MakeFuture(TStatus(EStatus::SUCCESS, {}));
+ };
+
+ retryNumber = 0;
+ UNIT_ASSERT(client.RetryOperation(operation, retrySettings).GetValueSync().IsSuccess());
+
+ retryNumber = 0;
+ UNIT_ASSERT(!client.RetryOperation(operation, retrySettingsLimited).GetValueSync().IsSuccess());
+
+ // Synchronous version
+ auto operationSync = [&retryNumber] (TSession /*session*/) -> TStatus {
+ if (retryNumber++ < MaxRetries) {
+ return TStatus(EStatus::OVERLOADED, {});
+ }
+ return TStatus(EStatus::SUCCESS, {});
+ };
+
+ retryNumber = 0;
+ UNIT_ASSERT(client.RetryOperationSync(operationSync, retrySettings).IsSuccess());
+
+ retryNumber = 0;
+ UNIT_ASSERT(!client.RetryOperationSync(operationSync, retrySettingsLimited).IsSuccess());
+
+ driver.Stop(true);
+ }
+
Y_UNIT_TEST(QueryLimits) {
NKikimrConfig::TAppConfig appConfig;
auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();