aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordvrazumov <dvrazumov@yandex-team.com>2023-09-06 23:42:26 +0300
committerdvrazumov <dvrazumov@yandex-team.com>2023-09-06 23:56:44 +0300
commit600f4c814ddc73257214a9120b284acd20645111 (patch)
treeced1af8054c709d217bb5f4d250f7849a3cc9367
parent3833837106583dd450af8c0bc3a02e56bbfb0b85 (diff)
downloadydb-600f4c814ddc73257214a9120b284acd20645111.tar.gz
KIKIMR-19051: made RertyContext as a template to reuse it in QueryClient
RetryContext improvements: * RetryContext is now a template class and may be used in both TableClient and QueryClient * The return type of retry operation is templated so it could return different types: TAsyncStatus/TAsyncExecuteQueryResult/etc * Unified and refactored code
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp31
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h94
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h160
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h110
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp24
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.h17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/ya.make8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_retry/retry.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp302
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h26
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp20
17 files changed, 515 insertions, 322 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index a37c71913f0..4dd8970b21b 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -154,21 +154,36 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(count, 2);
}
- Y_UNIT_TEST(ExecuteQuery) {
- auto kikimr = DefaultKikimrRunner();
- auto db = kikimr.GetQueryClient();
-
- auto result = db.ExecuteQuery(R"(
- SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key;
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ void CheckQueryResult(TExecuteQueryResult result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
CompareYson(R"([
[[3u];[1]];
[[4000000003u];[1]]
])", FormatResultSetYson(result.GetResultSet(0)));
}
+ Y_UNIT_TEST(ExecuteQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key";
+ auto result = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ CheckQueryResult(result);
+ }
+
+ Y_UNIT_TEST(ExecuteRetryQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key";
+ auto queryFunc = [&query](TSession session) -> TAsyncExecuteQueryResult {
+ return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx());
+ };
+ auto resultRetryFunc = db.RetryQuery(std::move(queryFunc)).GetValueSync();
+ CheckQueryResult(resultRetryFunc);
+ }
+
Y_UNIT_TEST(ExecuteQueryPg) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
index 490f7a746cf..09c3259171e 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h
@@ -1,6 +1,15 @@
#pragma once
+#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
+
+#include <library/cpp/threading/future/core/fwd.h>
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
#include <util/system/types.h>
+
#include <functional>
#include <memory>
@@ -10,10 +19,91 @@ class IClientImplCommon;
namespace NYdb::NRetry {
-struct TBackoffSettings;
ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber);
void Backoff(const NRetry::TBackoffSettings& settings, ui32 retryNumber);
void AsyncBackoff(std::shared_ptr<IClientImplCommon> client, const TBackoffSettings& settings,
ui32 retryNumber, const std::function<void()>& fn);
-}
+enum class NextStep {
+ RetryImmediately,
+ RetryFastBackoff,
+ RetrySlowBackoff,
+ Finish,
+};
+
+template <typename TClient>
+class TRetryContextBase : TNonCopyable {
+protected:
+ TClient Client;
+ TRetryOperationSettings Settings;
+ ui32 RetryNumber;
+
+protected:
+ TRetryContextBase(const TClient& client, const TRetryOperationSettings& settings)
+ : Client(client)
+ , Settings(settings)
+ , RetryNumber(0)
+ {}
+
+ virtual void Reset() {}
+
+ void LogRetry(const TStatus& status) {
+ if (Settings.Verbose_) {
+ Cerr << "Previous query attempt was finished with unsuccessful status "
+ << status.GetStatus() << ": " << status.GetIssues().ToString(true) << Endl;
+ Cerr << "Sending retry attempt " << RetryNumber << " of " << Settings.MaxRetries_ << Endl;
+ }
+ }
+
+ NextStep GetNextStep(const TStatus& status) {
+ if (status.IsSuccess()) {
+ return NextStep::Finish;
+ }
+ if (RetryNumber >= Settings.MaxRetries_) {
+ return NextStep::Finish;
+ }
+ switch (status.GetStatus()) {
+ case EStatus::ABORTED:
+ return NextStep::RetryImmediately;
+
+ case EStatus::OVERLOADED:
+ case EStatus::CLIENT_RESOURCE_EXHAUSTED:
+ return NextStep::RetrySlowBackoff;
+
+ case EStatus::UNAVAILABLE:
+ return NextStep::RetryFastBackoff;
+
+ case EStatus::BAD_SESSION:
+ case EStatus::SESSION_BUSY:
+ Reset();
+ return NextStep::RetryImmediately;
+
+ case EStatus::NOT_FOUND:
+ if (Settings.RetryNotFound_) {
+ return NextStep::RetryImmediately;
+ } else {
+ return NextStep::Finish;
+ }
+
+ case EStatus::UNDETERMINED:
+ if (Settings.Idempotent_) {
+ return NextStep::RetryFastBackoff;
+ } else {
+ return NextStep::Finish;
+ }
+
+ case EStatus::TRANSPORT_UNAVAILABLE:
+ if (Settings.Idempotent_) {
+ Reset();
+ return NextStep::RetryFastBackoff;
+ } else {
+ return NextStep::Finish;
+ }
+
+ default:
+ return NextStep::Finish;
+ }
+ }
+};
+
+} // namespace NYdb::NRetry
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h
new file mode 100644
index 00000000000..bf928f42587
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h
@@ -0,0 +1,160 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h>
+
+namespace NYdb::NRetry {
+
+template <typename TClient, typename TStatusType>
+class TRetryContextAsync : public TThrRefBase, public TRetryContextBase<TClient> {
+public:
+ using TPtr = TIntrusivePtr<NYdb::NRetry::TRetryContextAsync<TClient, TStatusType>>;
+ using TAsyncStatusType = typename NThreading::TFuture<TStatusType>;
+
+protected:
+ NThreading::TPromise<TStatusType> Promise;
+
+public:
+ TAsyncStatusType GetFuture() {
+ return this->Promise.GetFuture();
+ }
+
+ virtual void Execute() = 0;
+
+protected:
+ explicit TRetryContextAsync(const TClient& client, const TRetryOperationSettings& settings)
+ : TRetryContextBase<TClient>(client, settings)
+ , Promise(NThreading::NewPromise<TStatusType>())
+ {}
+
+ static void DoExecute(TPtr self) {
+ self->Execute();
+ }
+
+ static void DoBackoff(TPtr self, bool fast) {
+ auto backoffSettings = fast ? self->Settings.FastBackoffSettings_
+ : self->Settings.SlowBackoffSettings_;
+ AsyncBackoff(self->Client.Impl_, backoffSettings, self->RetryNumber,
+ [self]() {DoExecute(self);});
+ }
+
+ static void HandleExceptionAsync(TPtr self, std::exception_ptr e) {
+ self->Promise.SetException(e);
+ }
+
+ static void HandleStatusAsync(TPtr self, const TStatusType& status) {
+ auto nextStep = self->GetNextStep(status);
+ if (nextStep != NextStep::Finish) {
+ self->RetryNumber++;
+ self->Client.Impl_->CollectRetryStatAsync(status.GetStatus());
+ self->LogRetry(status);
+ }
+ switch (nextStep) {
+ case NextStep::RetryImmediately:
+ return DoExecute(self);
+ case NextStep::RetryFastBackoff:
+ return DoBackoff(self, true);
+ case NextStep::RetrySlowBackoff:
+ return DoBackoff(self, false);
+ case NextStep::Finish:
+ return self->Promise.SetValue(status);
+ }
+ }
+
+ static void DoRunOperation(TPtr self) {
+ self->RunOperation().Subscribe(
+ [self](const TAsyncStatusType& result) {
+ try {
+ HandleStatusAsync(self, result.GetValue());
+ } catch (...) {
+ HandleExceptionAsync(self, std::current_exception());
+ }
+ }
+ );
+ }
+
+ virtual TAsyncStatusType RunOperation() = 0;
+};
+
+template <typename TClient, typename TOperation, typename TStatusType>
+class TRetryWithoutSessionAsync : public TRetryContextAsync<TClient, TStatusType> {
+ using TRetryContextAsync = TRetryContextAsync<TClient, TStatusType>;
+ using TPtr = typename TRetryContextAsync::TPtr;
+ using TAsyncStatusType = typename TRetryContextAsync::TAsyncStatusType;
+
+private:
+ TOperation Operation;
+
+public:
+ explicit TRetryWithoutSessionAsync(const TClient& client, TOperation&& operation,
+ const TRetryOperationSettings& settings)
+ : TRetryContextAsync(client, settings)
+ , Operation(operation)
+ {}
+
+ void Execute() override {
+ TPtr self(this);
+ TRetryContextAsync::DoRunOperation(self);
+ }
+
+private:
+ TAsyncStatusType RunOperation() override {
+ return Operation(this->Client);
+ }
+};
+
+template <typename TClient, typename TOperation, typename TStatusType = typename std::result_of<TOperation>::type>
+class TRetryWithSessionAsync : public TRetryContextAsync<TClient, TStatusType> {
+ using TRetryContextAsync = TRetryContextAsync<TClient, TStatusType>;
+ using TPtr = typename TRetryContextAsync::TPtr;
+ using TAsyncStatusType = typename TRetryContextAsync::TAsyncStatusType;
+ using TSession = typename TClient::TSession;
+ using TCreateSessionSettings = typename TClient::TCreateSessionSettings;
+ using TAsyncCreateSessionResult = typename TClient::TAsyncCreateSessionResult;
+
+private:
+ TOperation Operation;
+ TMaybe<TSession> Session;
+
+public:
+ explicit TRetryWithSessionAsync(const TClient& client, TOperation&& operation,
+ const TRetryOperationSettings& settings)
+ : TRetryContextAsync(client, settings)
+ , Operation(operation)
+ {}
+
+ void Execute() override {
+ TPtr self(this);
+ if (!Session) {
+ auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_);
+ this->Client.GetSession(settings).Subscribe(
+ [self](const TAsyncCreateSessionResult& resultFuture) {
+ try {
+ auto& result = resultFuture.GetValue();
+ if (!result.IsSuccess()) {
+ return TRetryContextAsync::HandleStatusAsync(self, TStatusType(TStatus(result)));
+ }
+
+ auto* myself = dynamic_cast<TRetryWithSessionAsync*>(self.Get());
+ myself->Session = result.GetSession();
+ myself->DoRunOperation(self);
+ } catch (...) {
+ return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception());
+ }
+ }
+ );
+ } else {
+ TRetryContextAsync::DoRunOperation(self);
+ }
+ }
+
+private:
+ void Reset() override {
+ Session.Clear();
+ }
+
+ TAsyncStatusType RunOperation() override {
+ return Operation(this->Session.GetRef());
+ }
+};
+
+} // namespace NYdb::NRetry
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h
new file mode 100644
index 00000000000..41aad0d31d1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h
@@ -0,0 +1,110 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h>
+#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
+
+#include <util/generic/maybe.h>
+
+namespace NYdb::NRetry {
+
+template <typename TClient>
+class TRetryContextSync : public TRetryContextBase<TClient> {
+public:
+ TRetryContextSync(TClient& client, const TRetryOperationSettings& settings)
+ : TRetryContextBase<TClient>(client, settings)
+ {}
+
+ TStatus Execute() {
+ TStatus status = RunOperation(); // first attempt
+ for (this->RetryNumber = 0; this->RetryNumber <= this->Settings.MaxRetries_;) {
+ auto nextStep = this->GetNextStep(status);
+ switch (nextStep) {
+ case NextStep::RetryImmediately:
+ break;
+ case NextStep::RetryFastBackoff:
+ DoBackoff(true);
+ break;
+ case NextStep::RetrySlowBackoff:
+ DoBackoff(false);
+ break;
+ case NextStep::Finish:
+ return status;
+ }
+ // make next retry
+ this->RetryNumber++;
+ this->LogRetry(status);
+ this->Client.Impl_->CollectRetryStatSync(status.GetStatus());
+ status = RunOperation();
+ }
+ return status;
+ }
+
+protected:
+ void DoBackoff(bool fast) {
+ const auto &settings = fast ? this->Settings.FastBackoffSettings_
+ : this->Settings.SlowBackoffSettings_;
+ Backoff(settings, this->RetryNumber);
+ }
+
+ virtual TStatus RunOperation() = 0;
+};
+
+template<typename TClient, typename TOperation>
+class TRetryWithoutSessionSync : public TRetryContextSync<TClient> {
+private:
+ const TOperation& Operation;
+
+public:
+ TRetryWithoutSessionSync(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings)
+ : TRetryContextSync<TClient>(client, settings)
+ , Operation(operation)
+ {}
+
+protected:
+ TStatus RunOperation() override {
+ return Operation(this->Client);
+ }
+};
+
+template<typename TClient, typename TOperation>
+class TRetryWithSessionSync : public TRetryContextSync<TClient> {
+ using TSession = typename TClient::TSession;
+ using TCreateSessionSettings = typename TClient::TCreateSessionSettings;
+
+private:
+ const TOperation& Operation;
+ TMaybe<TSession> Session;
+
+public:
+ TRetryWithSessionSync(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings)
+ : TRetryContextSync<TClient>(client, settings)
+ , Operation(operation)
+ {}
+
+protected:
+ TStatus RunOperation() override {
+ TMaybe<NYdb::TStatus> status;
+
+ if (!Session) {
+ auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_);
+ auto sessionResult = this->Client.GetSession(settings).GetValueSync();
+ if (sessionResult.IsSuccess()) {
+ Session = sessionResult.GetSession();
+ }
+ status = sessionResult;
+ }
+
+ if (Session) {
+ status = Operation(Session.GetRef());
+ }
+
+ return *status;
+ }
+
+ void Reset() override {
+ Session.Clear();
+ }
+};
+
+} // namespace NYdb::NRetry
diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt
index 7a553f439fd..6b201d4ea70 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt
@@ -12,8 +12,14 @@ add_library(cpp-client-ydb_query)
target_link_libraries(cpp-client-ydb_query PUBLIC
contrib-libs-cxxsupp
yutil
- client-ydb_query-impl
+ impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
+ impl-ydb_internal-session_pool
+ impl-ydb_internal-retry
cpp-client-ydb_common_client
+ cpp-client-ydb_driver
+ client-ydb_query-impl
+ cpp-client-ydb_result
client-ydb_types-operation
)
target_sources(cpp-client-ydb_query PRIVATE
diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt
index 077dff767aa..c9cf47ea154 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt
@@ -13,8 +13,14 @@ target_link_libraries(cpp-client-ydb_query PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- client-ydb_query-impl
+ impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
+ impl-ydb_internal-session_pool
+ impl-ydb_internal-retry
cpp-client-ydb_common_client
+ cpp-client-ydb_driver
+ client-ydb_query-impl
+ cpp-client-ydb_result
client-ydb_types-operation
)
target_sources(cpp-client-ydb_query PRIVATE
diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt
index 077dff767aa..c9cf47ea154 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt
@@ -13,8 +13,14 @@ target_link_libraries(cpp-client-ydb_query PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- client-ydb_query-impl
+ impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
+ impl-ydb_internal-session_pool
+ impl-ydb_internal-retry
cpp-client-ydb_common_client
+ cpp-client-ydb_driver
+ client-ydb_query-impl
+ cpp-client-ydb_result
client-ydb_types-operation
)
target_sources(cpp-client-ydb_query PRIVATE
diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt
index 7a553f439fd..6b201d4ea70 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt
@@ -12,8 +12,14 @@ add_library(cpp-client-ydb_query)
target_link_libraries(cpp-client-ydb_query PUBLIC
contrib-libs-cxxsupp
yutil
- client-ydb_query-impl
+ impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
+ impl-ydb_internal-session_pool
+ impl-ydb_internal-retry
cpp-client-ydb_common_client
+ cpp-client-ydb_driver
+ client-ydb_query-impl
+ cpp-client-ydb_result
client-ydb_types-operation
)
target_sources(cpp-client-ydb_query PRIVATE
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index 12ea4166dc5..a223f874ec5 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -3,17 +3,24 @@
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h>
-#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h>
#undef INCLUDE_YDB_INTERNAL_H
#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h>
+#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
namespace NYdb::NQuery {
+using TRetryContext = NRetry::TRetryContextAsync<TQueryClient, TExecuteQueryResult>;
+using TRetryContextWithSession = NRetry::TRetryWithSessionAsync<TQueryClient, TQueryClient::TQueryFunc, TExecuteQueryResult>;
+
TCreateSessionSettings::TCreateSessionSettings() {
ClientTimeout_ = TDuration::Seconds(5);
};
@@ -342,6 +349,14 @@ public:
), NSessionPool::PERIODIC_ACTION_INTERVAL);
}
+ void CollectRetryStatAsync(EStatus status) {
+ Y_UNUSED(status);
+ }
+
+ void CollectRetryStatSync(EStatus status) {
+ Y_UNUSED(status);
+ }
+
private:
TClientSettings Settings_;
NSessionPool::TSessionPool SessionPool_;
@@ -407,6 +422,13 @@ i64 TQueryClient::GetCurrentPoolSize() const {
return Impl_->GetCurrentPoolSize();
}
+TAsyncExecuteQueryResult TQueryClient::RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings)
+{
+ TRetryContext::TPtr ctx(new TRetryContextWithSession(*this, std::move(queryFunc), settings));
+ ctx->Execute();
+ return ctx->GetFuture();
+}
+
////////////////////////////////////////////////////////////////////////////////
TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h
index 88e73e6689a..dc981bdc8f9 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.h
@@ -5,12 +5,19 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
+#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h>
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>
namespace NYdb {
class TProtoAccessor;
+
+ namespace NRetry {
+ template <typename TClient, typename TStatusType>
+ class TRetryContextAsync;
+ }
}
namespace NYdb::NQuery {
@@ -21,6 +28,7 @@ struct TCreateSessionSettings : public TSimpleRequestSettings<TCreateSessionSett
class TCreateSessionResult;
using TAsyncCreateSessionResult = NThreading::TFuture<TCreateSessionResult>;
+using TRetryOperationSettings = NYdb::NRetry::TRetryOperationSettings;
struct TSessionPoolSettings {
using TSelf = TSessionPoolSettings;
@@ -49,9 +57,16 @@ struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
class TSession;
class TQueryClient {
friend class TSession;
+ friend class NRetry::TRetryContextAsync<TQueryClient, TExecuteQueryResult>;
+
public:
+ using TQueryFunc = std::function<TAsyncExecuteQueryResult(TSession session)>;
+ using TQueryWithoutSessionFunc = std::function<TAsyncExecuteQueryResult(TQueryClient& client)>;
using TSettings = TClientSettings;
using TSession = TSession;
+ using TCreateSessionSettings = TCreateSessionSettings;
+ using TAsyncCreateSessionResult = TAsyncCreateSessionResult;
+
public:
TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
@@ -67,6 +82,8 @@ public:
TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+ TAsyncExecuteQueryResult RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings());
+
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script,
const TExecuteScriptSettings& settings = TExecuteScriptSettings());
diff --git a/ydb/public/sdk/cpp/client/ydb_query/ya.make b/ydb/public/sdk/cpp/client/ydb_query/ya.make
index 04cf2248245..12ee318af27 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_query/ya.make
@@ -12,8 +12,14 @@ SRCS(
)
PEERDIR(
- ydb/public/sdk/cpp/client/ydb_query/impl
+ ydb/public/sdk/cpp/client/impl/ydb_internal/make_request
+ ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common
+ ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool
+ ydb/public/sdk/cpp/client/impl/ydb_internal/retry
ydb/public/sdk/cpp/client/ydb_common_client
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/sdk/cpp/client/ydb_query/impl
+ ydb/public/sdk/cpp/client/ydb_result
ydb/public/sdk/cpp/client/ydb_types/operation
)
diff --git a/ydb/public/sdk/cpp/client/ydb_retry/retry.h b/ydb/public/sdk/cpp/client/ydb_retry/retry.h
index 7a6cdb61e95..d879f00e123 100644
--- a/ydb/public/sdk/cpp/client/ydb_retry/retry.h
+++ b/ydb/public/sdk/cpp/client/ydb_retry/retry.h
@@ -39,4 +39,4 @@ struct TRetryOperationSettings {
}
};
-};
+} // namespace NYdb::NRetry
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 8360a6228b6..871ce90bebc 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -1147,5 +1147,13 @@ TMaybe<TString> TTableClient::TImpl::GetQueryText(const TDataQuery& queryData) {
return queryData.GetText();
}
+void TTableClient::TImpl::CollectRetryStatAsync(EStatus status) {
+ RetryOperationStatCollector.IncAsyncRetryOperation(status);
}
+
+void TTableClient::TImpl::CollectRetryStatSync(EStatus status) {
+ RetryOperationStatCollector.IncSyncRetryOperation(status);
}
+
+} // namespace NTable
+} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index a3dd1de2f6b..8de98455558 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -144,6 +144,9 @@ public:
TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query,
const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
const TStreamExecScanQuerySettings& settings);
+ void CollectRetryStatAsync(EStatus status);
+ void CollectRetryStatSync(EStatus status);
+
public:
TClientSettings Settings_;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 0e466de52fe..07d926857e0 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -5,6 +5,8 @@
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h>
#undef INCLUDE_YDB_INTERNAL_H
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
@@ -34,6 +36,13 @@ namespace NTable {
using namespace NThreading;
using namespace NSessionPool;
+using TRetryContext = NRetry::TRetryContextAsync<TTableClient, TStatus>;
+using TRetryWithSession = NRetry::TRetryWithSessionAsync<TTableClient, TTableClient::TOperationFunc, TStatus>;
+using TRetryWithoutSession = NRetry::TRetryWithoutSessionAsync<TTableClient, TTableClient::TOperationWithoutSessionFunc, TStatus>;
+
+using TRetryWithSessionSync = NRetry::TRetryWithSessionSync<TTableClient, TTableClient::TOperationSyncFunc>;
+using TRetryWithoutSessionSync = NRetry::TRetryWithoutSessionSync<TTableClient, TTableClient::TOperationWithoutSessionSyncFunc>;
+
////////////////////////////////////////////////////////////////////////////////
class TStorageSettings::TImpl {
@@ -1374,307 +1383,26 @@ TTypeBuilder TTableClient::GetTypeBuilder() {
////////////////////////////////////////////////////////////////////////////////
-struct TRetryState {
- using TAsyncFunc = std::function<TAsyncStatus()>;
- using THandleStatusFunc = std::function<TAsyncStatus(const std::shared_ptr<TRetryState>& state,
- const TStatus& status, const TAsyncFunc& func, ui32 retryNumber)>;
-
- TMaybe<TSession> Session;
- THandleStatusFunc HandleStatusFunc;
-};
-
-class TRetryOperationContext : public TThrRefBase, TNonCopyable {
-public:
- using TRetryContextPtr = TIntrusivePtr<TRetryOperationContext>;
-
-protected:
- TRetryOperationSettings Settings;
- TTableClient TableClient;
- NThreading::TPromise<TStatus> Promise;
- ui32 RetryNumber;
-
-public:
- virtual void Execute() = 0;
-
- TAsyncStatus GetFuture() {
- return Promise.GetFuture();
- }
-
-protected:
- TRetryOperationContext(const TRetryOperationSettings& settings,
- const TTableClient& tableClient)
- : Settings(settings)
- , TableClient(tableClient)
- , Promise(NThreading::NewPromise<TStatus>())
- , RetryNumber(0)
- {}
-
- static void RunOp(TRetryContextPtr self) {
- self->Execute();
- }
-
- virtual void Reset() {}
-
- static void DoRetry(TRetryContextPtr self, bool fast) {
- AsyncBackoff(self->TableClient.Impl_,
- fast ? self->Settings.FastBackoffSettings_ : self->Settings.SlowBackoffSettings_,
- self->RetryNumber,
- [self]() {
- RunOp(self);
- }
- );
- }
-
- static void HandleStatus(TRetryContextPtr self, const TStatus& status) {
- if (status.IsSuccess()) {
- return self->Promise.SetValue(status);
- }
-
- if (self->RetryNumber >= self->Settings.MaxRetries_) {
- return self->Promise.SetValue(status);
- }
- if (self->Settings.Verbose_) {
- Cerr << "Previous query attempt was finished with unsuccessful status: "
- << status.GetIssues().ToString() << ". Status is " << status.GetStatus() << "."
- << "Send retry attempt " << self->RetryNumber << " of " << self->Settings.MaxRetries_ << Endl;
- }
- self->RetryNumber++;
- self->TableClient.Impl_->RetryOperationStatCollector.IncAsyncRetryOperation(status.GetStatus());
-
- switch (status.GetStatus()) {
- case EStatus::ABORTED:
- return RunOp(self);
-
- case EStatus::OVERLOADED:
- case EStatus::CLIENT_RESOURCE_EXHAUSTED:
- return DoRetry(self, false);
-
- case EStatus::UNAVAILABLE:
- return DoRetry(self, true);
-
- case EStatus::BAD_SESSION:
- case EStatus::SESSION_BUSY:
- self->Reset();
- return RunOp(self);
-
- case EStatus::NOT_FOUND:
- return self->Settings.RetryNotFound_
- ? RunOp(self)
- : self->Promise.SetValue(status);
-
- case EStatus::UNDETERMINED:
- return self->Settings.Idempotent_
- ? DoRetry(self, true)
- : self->Promise.SetValue(status);
-
- case EStatus::TRANSPORT_UNAVAILABLE:
- if (self->Settings.Idempotent_) {
- self->Reset();
- return DoRetry(self, true);
- } else {
- return self->Promise.SetValue(status);
- }
-
- default:
- return self->Promise.SetValue(status);
- }
- }
-
- static void HandleException(TRetryContextPtr self, std::exception_ptr e) {
- self->Promise.SetException(e);
- }
-};
-
-class TRetryOperationWithSession : public TRetryOperationContext {
- using TFunc = TTableClient::TOperationFunc;
-
- TFunc Operation;
- TMaybe<TSession> Session;
-
-public:
- explicit TRetryOperationWithSession(TFunc&& operation,
- const TRetryOperationSettings& settings,
- const TTableClient& tableClient)
- : TRetryOperationContext(settings, tableClient)
- , Operation(operation)
- {}
-
- void Execute() override {
- TRetryContextPtr self(this);
- if (!Session) {
- TableClient.GetSession(
- TCreateSessionSettings().ClientTimeout(Settings.GetSessionClientTimeout_)).Subscribe(
- [self](const TAsyncCreateSessionResult& resultFuture) {
- try {
- auto& result = resultFuture.GetValue();
- if (!result.IsSuccess()) {
- return HandleStatus(self, result);
- }
-
- auto* myself = dynamic_cast<TRetryOperationWithSession*>(self.Get());
- myself->Session = result.GetSession();
- myself->DoRunOp(self);
- } catch (...) {
- return HandleException(self, std::current_exception());
- }
- });
- } else {
- DoRunOp(self);
- }
- }
-
-private:
- void Reset() override {
- Session.Clear();
- }
-
- void DoRunOp(TRetryContextPtr self) {
- Operation(Session.GetRef()).Subscribe([self](const TAsyncStatus& result) {
- try {
- return HandleStatus(self, result.GetValue());
- } catch (...) {
- return HandleException(self, std::current_exception());
- }
- });
- }
-};
-
TAsyncStatus TTableClient::RetryOperation(TOperationFunc&& operation, const TRetryOperationSettings& settings) {
- TRetryOperationContext::TRetryContextPtr ctx(new TRetryOperationWithSession(std::move(operation), settings, *this));
+ TRetryContext::TPtr ctx(new TRetryWithSession(*this, std::move(operation), settings));
ctx->Execute();
return ctx->GetFuture();
}
-class TRetryOperationWithoutSession : public TRetryOperationContext {
- using TFunc = TTableClient::TOperationWithoutSessionFunc;
-
- TFunc Operation;
-
-public:
- explicit TRetryOperationWithoutSession(TFunc&& operation,
- const TRetryOperationSettings& settings,
- const TTableClient& tableClient)
- : TRetryOperationContext(settings, tableClient)
- , Operation(operation)
- {}
-
- void Execute() override {
- TRetryContextPtr self(this);
- Operation(TableClient).Subscribe([self](const TAsyncStatus& result) {
- try {
- return HandleStatus(self, result.GetValue());
- } catch (...) {
- return HandleException(self, std::current_exception());
- }
- });
- }
-};
-
TAsyncStatus TTableClient::RetryOperation(TOperationWithoutSessionFunc&& operation, const TRetryOperationSettings& settings) {
- TRetryOperationContext::TRetryContextPtr ctx(new TRetryOperationWithoutSession(std::move(operation), settings, *this));
+ TRetryContext::TPtr ctx(new TRetryWithoutSession(*this, std::move(operation), settings));
ctx->Execute();
return ctx->GetFuture();
}
-TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings) {
- TRetryState retryState;
- TMaybe<NYdb::TStatus> status;
-
- for (ui32 retryNumber = 0; retryNumber <= settings.MaxRetries_; ++retryNumber) {
- status = operationWrapper(retryState);
-
- if (status->IsSuccess()) {
- return *status;
- }
-
- if (retryNumber == settings.MaxRetries_) {
- break;
- }
-
- switch (status->GetStatus()) {
- case EStatus::ABORTED:
- break;
-
- case EStatus::OVERLOADED:
- case EStatus::CLIENT_RESOURCE_EXHAUSTED: {
- Backoff(settings.SlowBackoffSettings_, retryNumber);
- break;
- }
-
- case EStatus::UNAVAILABLE:{
- Backoff(settings.FastBackoffSettings_, retryNumber);
- break;
- }
-
- case EStatus::BAD_SESSION:
- case EStatus::SESSION_BUSY:
- retryState.Session.Clear();
- break;
-
- case EStatus::NOT_FOUND:
- if (!settings.RetryNotFound_) {
- return *status;
- }
- break;
-
- case EStatus::UNDETERMINED:
- if (!settings.Idempotent_) {
- return *status;
- }
- Backoff(settings.FastBackoffSettings_, retryNumber);
- break;
-
- case EStatus::TRANSPORT_UNAVAILABLE:
- if (!settings.Idempotent_) {
- return *status;
- }
- retryState.Session.Clear();
- Backoff(settings.FastBackoffSettings_, retryNumber);
- break;
-
- default:
- return *status;
- }
- Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus());
- }
-
- return *status;
-}
-
TStatus TTableClient::RetryOperationSync(const TOperationWithoutSessionSyncFunc& operation, const TRetryOperationSettings& settings) {
- auto operationWrapper = [this, &operation] (TRetryState&) {
- return operation(*this);
- };
-
- return RetryOperationSyncHelper(operationWrapper, settings);
+ TRetryWithoutSessionSync ctx(*this, operation, settings);
+ return ctx.Execute();
}
TStatus TTableClient::RetryOperationSync(const TOperationSyncFunc& operation, const TRetryOperationSettings& settings) {
- TRetryState retryState;
-
- auto operationWrapper = [this, &operation, &settings] (TRetryState& retryState) {
- TMaybe<NYdb::TStatus> status;
-
- if (!retryState.Session) {
- auto sessionResult = Impl_->GetSession(
- TCreateSessionSettings().ClientTimeout(settings.GetSessionClientTimeout_)).GetValueSync();
- if (sessionResult.IsSuccess()) {
- retryState.Session = sessionResult.GetSession();
- }
- status = sessionResult;
- }
-
- if (retryState.Session) {
- status = operation(retryState.Session.GetRef());
- if (status->IsSuccess()) {
- return *status;
- }
- }
-
- return *status;
- };
-
- return RetryOperationSyncHelper(operationWrapper, settings);
+ TRetryWithSessionSync ctx(*this, operation, settings);
+ return ctx.Execute();
}
NThreading::TFuture<void> TTableClient::Stop() {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 4892a3a32b9..435c5716bb0 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -5,10 +5,10 @@
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/query_stats/stats.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
-#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h>
#include <util/generic/hash.h>
#include <util/generic/maybe.h>
@@ -30,14 +30,22 @@ class TableIndex;
class TableIndexDescription;
class ValueSinceUnixEpochModeSettings;
-}
-}
+} // namespace Table
+} // namespace Ydb
namespace NYdb {
+namespace NRetry {
+template <typename TClient, typename TStatusType>
+class TRetryContextAsync;
+
+template <typename>
+class TRetryContextSync;
+} // namespace NRetry
+
namespace NScheme {
struct TPermissions;
-}
+} // namespace NScheme
namespace NTable {
@@ -960,7 +968,6 @@ struct TStreamExecScanQuerySettings : public TRequestSettings<TStreamExecScanQue
class TSession;
class TSessionPool;
-struct TRetryState;
enum class EDataFormat {
ApacheArrow = 1,
@@ -971,7 +978,8 @@ class TTableClient {
friend class TSession;
friend class TTransaction;
friend class TSessionPool;
- friend class TRetryOperationContext;
+ friend class NRetry::TRetryContextAsync<TTableClient, TStatus>;
+ friend class NRetry::TRetryContextSync<TTableClient>;
public:
using TOperationFunc = std::function<TAsyncStatus(TSession session)>;
@@ -980,6 +988,8 @@ public:
using TOperationWithoutSessionSyncFunc = std::function<TStatus(TTableClient& tableClient)>;
using TSettings = TClientSettings;
using TSession = TSession;
+ using TCreateSessionSettings = TCreateSessionSettings;
+ using TAsyncCreateSessionResult = TAsyncCreateSessionResult;
public:
TTableClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
@@ -1060,10 +1070,6 @@ public:
const TStreamExecScanQuerySettings& settings = TStreamExecScanQuerySettings());
private:
- using TOperationWrapperSyncFunc = std::function<TStatus(TRetryState& retryState)>;
- TStatus RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings);
-
-private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
};
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index 886aa50d6f0..3b4bf878f19 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -2007,7 +2007,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015
{
size_t retryNumber = 0;
TVector<TResultSet> resultSets;
- auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) {
+ auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) -> TAsyncStatus {
// iterate over all providen statuses and return TStatus to emulate error
if (retryNumber < retriableStatuses.size()) {
TStatus status(retriableStatuses[retryNumber++], {});
@@ -2018,12 +2018,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015
resultSets = queryStatus.GetResultSets();
return NThreading::MakeFuture<TStatus>(queryStatus);
};
- auto operationWithoutSession = [&operation] (TTableClient client) {
+ auto operationWithoutSession = [&operation] (TTableClient client) -> TAsyncStatus {
auto session = client.CreateSession().GetValueSync().GetSession();
return operation(session);
};
- const auto retrySettings = settings.MaxRetries(retriableStatuses.size() + 1).Verbose(true);
+ const auto retrySettings = settings.MaxRetries(retriableStatuses.size()).Verbose(true);
auto result = client.RetryOperation(operation, retrySettings);
CheckRetryResult(result.GetValueSync(), resultSets, expectSuccess);
@@ -2038,7 +2038,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015
{
size_t retryNumber = 0;
TVector<TResultSet> resultSets;
- auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) {
+ auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) -> TStatus {
// iterate over all providen statuses and return TStatus to emulate error
if (retryNumber < retriableStatuses.size()) {
TStatus status(retriableStatuses[retryNumber++], {});
@@ -2049,12 +2049,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015
resultSets = queryStatus.GetResultSets();
return TStatus(queryStatus);
};
- auto operationWithoutSession = [&operation] (TTableClient client) {
+ auto operationWithoutSession = [&operation] (TTableClient client) -> TStatus {
auto session = client.CreateSession().GetValueSync().GetSession();
return operation(session);
};
- const auto retrySettings = settings.MaxRetries(retriableStatuses.size() + 1).Verbose(true);
+ const auto retrySettings = settings.MaxRetries(retriableStatuses.size()).Verbose(true);
auto result = client.RetryOperationSync(operation, retrySettings);
CheckRetryResult(result, resultSets, expectSuccess);
@@ -2078,10 +2078,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015
NYdb::NTable::TTableClient client(driver);
TestRetryOperationAsync(client, GetRetriableAlwaysStatuses(), true);
- TestRetryOperationAsync(client, GetRetriableOnOptionStatuses(), false);
TestRetryOperationAsync(client, {EStatus::NOT_FOUND}, true, TRetryOperationSettings().RetryNotFound(true));
TestRetryOperationAsync(client, {EStatus::UNDETERMINED}, true, TRetryOperationSettings().Idempotent(true));
TestRetryOperationAsync(client, {EStatus::TRANSPORT_UNAVAILABLE}, true, TRetryOperationSettings().Idempotent(true));
+ TestRetryOperationAsync(client, {EStatus::NOT_FOUND}, false, TRetryOperationSettings().RetryNotFound(false));
+ TestRetryOperationAsync(client, {EStatus::UNDETERMINED}, false, TRetryOperationSettings().Idempotent(false));
+ TestRetryOperationAsync(client, {EStatus::TRANSPORT_UNAVAILABLE}, false, TRetryOperationSettings().Idempotent(false));
driver.Stop(true);
}
@@ -2092,10 +2094,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015
NYdb::NTable::TTableClient client(driver);
TestRetryOperationSync(client, GetRetriableAlwaysStatuses(), true);
- TestRetryOperationSync(client, GetRetriableOnOptionStatuses(), false);
TestRetryOperationSync(client, {EStatus::NOT_FOUND}, true, TRetryOperationSettings().RetryNotFound(true));
TestRetryOperationSync(client, {EStatus::UNDETERMINED}, true, TRetryOperationSettings().Idempotent(true));
TestRetryOperationSync(client, {EStatus::TRANSPORT_UNAVAILABLE}, true, TRetryOperationSettings().Idempotent(true));
+ TestRetryOperationSync(client, {EStatus::NOT_FOUND}, false, TRetryOperationSettings().RetryNotFound(false));
+ TestRetryOperationSync(client, {EStatus::UNDETERMINED}, false, TRetryOperationSettings().Idempotent(false));
+ TestRetryOperationSync(client, {EStatus::TRANSPORT_UNAVAILABLE}, false, TRetryOperationSettings().Idempotent(false));
driver.Stop(true);
}