diff options
author | dvrazumov <dvrazumov@yandex-team.com> | 2023-09-06 23:42:26 +0300 |
---|---|---|
committer | dvrazumov <dvrazumov@yandex-team.com> | 2023-09-06 23:56:44 +0300 |
commit | 600f4c814ddc73257214a9120b284acd20645111 (patch) | |
tree | ced1af8054c709d217bb5f4d250f7849a3cc9367 | |
parent | 3833837106583dd450af8c0bc3a02e56bbfb0b85 (diff) | |
download | ydb-600f4c814ddc73257214a9120b284acd20645111.tar.gz |
KIKIMR-19051: made RertyContext as a template to reuse it in QueryClient
RetryContext improvements:
* RetryContext is now a template class and may be used in both TableClient and QueryClient
* The return type of retry operation is templated so it could return different types: TAsyncStatus/TAsyncExecuteQueryResult/etc
* Unified and refactored code
17 files changed, 515 insertions, 322 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index a37c71913f0..4dd8970b21b 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -154,21 +154,36 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(count, 2); } - Y_UNIT_TEST(ExecuteQuery) { - auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetQueryClient(); - - auto result = db.ExecuteQuery(R"( - SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key; - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + void CheckQueryResult(TExecuteQueryResult result) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - + UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1); CompareYson(R"([ [[3u];[1]]; [[4000000003u];[1]] ])", FormatResultSetYson(result.GetResultSet(0))); } + Y_UNIT_TEST(ExecuteQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key"; + auto result = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + CheckQueryResult(result); + } + + Y_UNIT_TEST(ExecuteRetryQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key"; + auto queryFunc = [&query](TSession session) -> TAsyncExecuteQueryResult { + return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()); + }; + auto resultRetryFunc = db.RetryQuery(std::move(queryFunc)).GetValueSync(); + CheckQueryResult(resultRetryFunc); + } + Y_UNIT_TEST(ExecuteQueryPg) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h index 490f7a746cf..09c3259171e 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h @@ -1,6 +1,15 @@ #pragma once +#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> +#include <ydb/public/sdk/cpp/client/ydb_types/fluent_settings_helpers.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> + +#include <library/cpp/threading/future/core/fwd.h> +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> #include <util/system/types.h> + #include <functional> #include <memory> @@ -10,10 +19,91 @@ class IClientImplCommon; namespace NYdb::NRetry { -struct TBackoffSettings; ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber); void Backoff(const NRetry::TBackoffSettings& settings, ui32 retryNumber); void AsyncBackoff(std::shared_ptr<IClientImplCommon> client, const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn); -} +enum class NextStep { + RetryImmediately, + RetryFastBackoff, + RetrySlowBackoff, + Finish, +}; + +template <typename TClient> +class TRetryContextBase : TNonCopyable { +protected: + TClient Client; + TRetryOperationSettings Settings; + ui32 RetryNumber; + +protected: + TRetryContextBase(const TClient& client, const TRetryOperationSettings& settings) + : Client(client) + , Settings(settings) + , RetryNumber(0) + {} + + virtual void Reset() {} + + void LogRetry(const TStatus& status) { + if (Settings.Verbose_) { + Cerr << "Previous query attempt was finished with unsuccessful status " + << status.GetStatus() << ": " << status.GetIssues().ToString(true) << Endl; + Cerr << "Sending retry attempt " << RetryNumber << " of " << Settings.MaxRetries_ << Endl; + } + } + + NextStep GetNextStep(const TStatus& status) { + if (status.IsSuccess()) { + return NextStep::Finish; + } + if (RetryNumber >= Settings.MaxRetries_) { + return NextStep::Finish; + } + switch (status.GetStatus()) { + case EStatus::ABORTED: + return NextStep::RetryImmediately; + + case EStatus::OVERLOADED: + case EStatus::CLIENT_RESOURCE_EXHAUSTED: + return NextStep::RetrySlowBackoff; + + case EStatus::UNAVAILABLE: + return NextStep::RetryFastBackoff; + + case EStatus::BAD_SESSION: + case EStatus::SESSION_BUSY: + Reset(); + return NextStep::RetryImmediately; + + case EStatus::NOT_FOUND: + if (Settings.RetryNotFound_) { + return NextStep::RetryImmediately; + } else { + return NextStep::Finish; + } + + case EStatus::UNDETERMINED: + if (Settings.Idempotent_) { + return NextStep::RetryFastBackoff; + } else { + return NextStep::Finish; + } + + case EStatus::TRANSPORT_UNAVAILABLE: + if (Settings.Idempotent_) { + Reset(); + return NextStep::RetryFastBackoff; + } else { + return NextStep::Finish; + } + + default: + return NextStep::Finish; + } + } +}; + +} // namespace NYdb::NRetry diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h new file mode 100644 index 00000000000..bf928f42587 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h @@ -0,0 +1,160 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h> + +namespace NYdb::NRetry { + +template <typename TClient, typename TStatusType> +class TRetryContextAsync : public TThrRefBase, public TRetryContextBase<TClient> { +public: + using TPtr = TIntrusivePtr<NYdb::NRetry::TRetryContextAsync<TClient, TStatusType>>; + using TAsyncStatusType = typename NThreading::TFuture<TStatusType>; + +protected: + NThreading::TPromise<TStatusType> Promise; + +public: + TAsyncStatusType GetFuture() { + return this->Promise.GetFuture(); + } + + virtual void Execute() = 0; + +protected: + explicit TRetryContextAsync(const TClient& client, const TRetryOperationSettings& settings) + : TRetryContextBase<TClient>(client, settings) + , Promise(NThreading::NewPromise<TStatusType>()) + {} + + static void DoExecute(TPtr self) { + self->Execute(); + } + + static void DoBackoff(TPtr self, bool fast) { + auto backoffSettings = fast ? self->Settings.FastBackoffSettings_ + : self->Settings.SlowBackoffSettings_; + AsyncBackoff(self->Client.Impl_, backoffSettings, self->RetryNumber, + [self]() {DoExecute(self);}); + } + + static void HandleExceptionAsync(TPtr self, std::exception_ptr e) { + self->Promise.SetException(e); + } + + static void HandleStatusAsync(TPtr self, const TStatusType& status) { + auto nextStep = self->GetNextStep(status); + if (nextStep != NextStep::Finish) { + self->RetryNumber++; + self->Client.Impl_->CollectRetryStatAsync(status.GetStatus()); + self->LogRetry(status); + } + switch (nextStep) { + case NextStep::RetryImmediately: + return DoExecute(self); + case NextStep::RetryFastBackoff: + return DoBackoff(self, true); + case NextStep::RetrySlowBackoff: + return DoBackoff(self, false); + case NextStep::Finish: + return self->Promise.SetValue(status); + } + } + + static void DoRunOperation(TPtr self) { + self->RunOperation().Subscribe( + [self](const TAsyncStatusType& result) { + try { + HandleStatusAsync(self, result.GetValue()); + } catch (...) { + HandleExceptionAsync(self, std::current_exception()); + } + } + ); + } + + virtual TAsyncStatusType RunOperation() = 0; +}; + +template <typename TClient, typename TOperation, typename TStatusType> +class TRetryWithoutSessionAsync : public TRetryContextAsync<TClient, TStatusType> { + using TRetryContextAsync = TRetryContextAsync<TClient, TStatusType>; + using TPtr = typename TRetryContextAsync::TPtr; + using TAsyncStatusType = typename TRetryContextAsync::TAsyncStatusType; + +private: + TOperation Operation; + +public: + explicit TRetryWithoutSessionAsync(const TClient& client, TOperation&& operation, + const TRetryOperationSettings& settings) + : TRetryContextAsync(client, settings) + , Operation(operation) + {} + + void Execute() override { + TPtr self(this); + TRetryContextAsync::DoRunOperation(self); + } + +private: + TAsyncStatusType RunOperation() override { + return Operation(this->Client); + } +}; + +template <typename TClient, typename TOperation, typename TStatusType = typename std::result_of<TOperation>::type> +class TRetryWithSessionAsync : public TRetryContextAsync<TClient, TStatusType> { + using TRetryContextAsync = TRetryContextAsync<TClient, TStatusType>; + using TPtr = typename TRetryContextAsync::TPtr; + using TAsyncStatusType = typename TRetryContextAsync::TAsyncStatusType; + using TSession = typename TClient::TSession; + using TCreateSessionSettings = typename TClient::TCreateSessionSettings; + using TAsyncCreateSessionResult = typename TClient::TAsyncCreateSessionResult; + +private: + TOperation Operation; + TMaybe<TSession> Session; + +public: + explicit TRetryWithSessionAsync(const TClient& client, TOperation&& operation, + const TRetryOperationSettings& settings) + : TRetryContextAsync(client, settings) + , Operation(operation) + {} + + void Execute() override { + TPtr self(this); + if (!Session) { + auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_); + this->Client.GetSession(settings).Subscribe( + [self](const TAsyncCreateSessionResult& resultFuture) { + try { + auto& result = resultFuture.GetValue(); + if (!result.IsSuccess()) { + return TRetryContextAsync::HandleStatusAsync(self, TStatusType(TStatus(result))); + } + + auto* myself = dynamic_cast<TRetryWithSessionAsync*>(self.Get()); + myself->Session = result.GetSession(); + myself->DoRunOperation(self); + } catch (...) { + return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception()); + } + } + ); + } else { + TRetryContextAsync::DoRunOperation(self); + } + } + +private: + void Reset() override { + Session.Clear(); + } + + TAsyncStatusType RunOperation() override { + return Operation(this->Session.GetRef()); + } +}; + +} // namespace NYdb::NRetry diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h new file mode 100644 index 00000000000..41aad0d31d1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h @@ -0,0 +1,110 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h> +#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> + +#include <util/generic/maybe.h> + +namespace NYdb::NRetry { + +template <typename TClient> +class TRetryContextSync : public TRetryContextBase<TClient> { +public: + TRetryContextSync(TClient& client, const TRetryOperationSettings& settings) + : TRetryContextBase<TClient>(client, settings) + {} + + TStatus Execute() { + TStatus status = RunOperation(); // first attempt + for (this->RetryNumber = 0; this->RetryNumber <= this->Settings.MaxRetries_;) { + auto nextStep = this->GetNextStep(status); + switch (nextStep) { + case NextStep::RetryImmediately: + break; + case NextStep::RetryFastBackoff: + DoBackoff(true); + break; + case NextStep::RetrySlowBackoff: + DoBackoff(false); + break; + case NextStep::Finish: + return status; + } + // make next retry + this->RetryNumber++; + this->LogRetry(status); + this->Client.Impl_->CollectRetryStatSync(status.GetStatus()); + status = RunOperation(); + } + return status; + } + +protected: + void DoBackoff(bool fast) { + const auto &settings = fast ? this->Settings.FastBackoffSettings_ + : this->Settings.SlowBackoffSettings_; + Backoff(settings, this->RetryNumber); + } + + virtual TStatus RunOperation() = 0; +}; + +template<typename TClient, typename TOperation> +class TRetryWithoutSessionSync : public TRetryContextSync<TClient> { +private: + const TOperation& Operation; + +public: + TRetryWithoutSessionSync(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings) + : TRetryContextSync<TClient>(client, settings) + , Operation(operation) + {} + +protected: + TStatus RunOperation() override { + return Operation(this->Client); + } +}; + +template<typename TClient, typename TOperation> +class TRetryWithSessionSync : public TRetryContextSync<TClient> { + using TSession = typename TClient::TSession; + using TCreateSessionSettings = typename TClient::TCreateSessionSettings; + +private: + const TOperation& Operation; + TMaybe<TSession> Session; + +public: + TRetryWithSessionSync(TClient& client, const TOperation& operation, const TRetryOperationSettings& settings) + : TRetryContextSync<TClient>(client, settings) + , Operation(operation) + {} + +protected: + TStatus RunOperation() override { + TMaybe<NYdb::TStatus> status; + + if (!Session) { + auto settings = TCreateSessionSettings().ClientTimeout(this->Settings.GetSessionClientTimeout_); + auto sessionResult = this->Client.GetSession(settings).GetValueSync(); + if (sessionResult.IsSuccess()) { + Session = sessionResult.GetSession(); + } + status = sessionResult; + } + + if (Session) { + status = Operation(Session.GetRef()); + } + + return *status; + } + + void Reset() override { + Session.Clear(); + } +}; + +} // namespace NYdb::NRetry diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt index 7a553f439fd..6b201d4ea70 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.darwin-x86_64.txt @@ -12,8 +12,14 @@ add_library(cpp-client-ydb_query) target_link_libraries(cpp-client-ydb_query PUBLIC contrib-libs-cxxsupp yutil - client-ydb_query-impl + impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common + impl-ydb_internal-session_pool + impl-ydb_internal-retry cpp-client-ydb_common_client + cpp-client-ydb_driver + client-ydb_query-impl + cpp-client-ydb_result client-ydb_types-operation ) target_sources(cpp-client-ydb_query PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt index 077dff767aa..c9cf47ea154 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-aarch64.txt @@ -13,8 +13,14 @@ target_link_libraries(cpp-client-ydb_query PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - client-ydb_query-impl + impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common + impl-ydb_internal-session_pool + impl-ydb_internal-retry cpp-client-ydb_common_client + cpp-client-ydb_driver + client-ydb_query-impl + cpp-client-ydb_result client-ydb_types-operation ) target_sources(cpp-client-ydb_query PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt index 077dff767aa..c9cf47ea154 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.linux-x86_64.txt @@ -13,8 +13,14 @@ target_link_libraries(cpp-client-ydb_query PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - client-ydb_query-impl + impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common + impl-ydb_internal-session_pool + impl-ydb_internal-retry cpp-client-ydb_common_client + cpp-client-ydb_driver + client-ydb_query-impl + cpp-client-ydb_result client-ydb_types-operation ) target_sources(cpp-client-ydb_query PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt index 7a553f439fd..6b201d4ea70 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_query/CMakeLists.windows-x86_64.txt @@ -12,8 +12,14 @@ add_library(cpp-client-ydb_query) target_link_libraries(cpp-client-ydb_query PUBLIC contrib-libs-cxxsupp yutil - client-ydb_query-impl + impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common + impl-ydb_internal-session_pool + impl-ydb_internal-retry cpp-client-ydb_common_client + cpp-client-ydb_driver + client-ydb_query-impl + cpp-client-ydb_result client-ydb_types-operation ) target_sources(cpp-client-ydb_query PRIVATE diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 12ea4166dc5..a223f874ec5 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -3,17 +3,24 @@ #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h> -#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h> #undef INCLUDE_YDB_INTERNAL_H #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h> +#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> namespace NYdb::NQuery { +using TRetryContext = NRetry::TRetryContextAsync<TQueryClient, TExecuteQueryResult>; +using TRetryContextWithSession = NRetry::TRetryWithSessionAsync<TQueryClient, TQueryClient::TQueryFunc, TExecuteQueryResult>; + TCreateSessionSettings::TCreateSessionSettings() { ClientTimeout_ = TDuration::Seconds(5); }; @@ -342,6 +349,14 @@ public: ), NSessionPool::PERIODIC_ACTION_INTERVAL); } + void CollectRetryStatAsync(EStatus status) { + Y_UNUSED(status); + } + + void CollectRetryStatSync(EStatus status) { + Y_UNUSED(status); + } + private: TClientSettings Settings_; NSessionPool::TSessionPool SessionPool_; @@ -407,6 +422,13 @@ i64 TQueryClient::GetCurrentPoolSize() const { return Impl_->GetCurrentPoolSize(); } +TAsyncExecuteQueryResult TQueryClient::RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings) +{ + TRetryContext::TPtr ctx(new TRetryContextWithSession(*this, std::move(queryFunc), settings)); + ctx->Execute(); + return ctx->GetFuture(); +} + //////////////////////////////////////////////////////////////////////////////// TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session) diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h index 88e73e6689a..dc981bdc8f9 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/ydb_query/client.h @@ -5,12 +5,19 @@ #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> +#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> +#include <ydb/public/sdk/cpp/client/ydb_types/request_settings.h> #include <util/generic/maybe.h> #include <util/generic/ptr.h> namespace NYdb { class TProtoAccessor; + + namespace NRetry { + template <typename TClient, typename TStatusType> + class TRetryContextAsync; + } } namespace NYdb::NQuery { @@ -21,6 +28,7 @@ struct TCreateSessionSettings : public TSimpleRequestSettings<TCreateSessionSett class TCreateSessionResult; using TAsyncCreateSessionResult = NThreading::TFuture<TCreateSessionResult>; +using TRetryOperationSettings = NYdb::NRetry::TRetryOperationSettings; struct TSessionPoolSettings { using TSelf = TSessionPoolSettings; @@ -49,9 +57,16 @@ struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> { class TSession; class TQueryClient { friend class TSession; + friend class NRetry::TRetryContextAsync<TQueryClient, TExecuteQueryResult>; + public: + using TQueryFunc = std::function<TAsyncExecuteQueryResult(TSession session)>; + using TQueryWithoutSessionFunc = std::function<TAsyncExecuteQueryResult(TQueryClient& client)>; using TSettings = TClientSettings; using TSession = TSession; + using TCreateSessionSettings = TCreateSessionSettings; + using TAsyncCreateSessionResult = TAsyncCreateSessionResult; + public: TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); @@ -67,6 +82,8 @@ public: TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl, const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings()); + TAsyncExecuteQueryResult RetryQuery(TQueryFunc&& queryFunc, TRetryOperationSettings settings = TRetryOperationSettings()); + NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings = TExecuteScriptSettings()); diff --git a/ydb/public/sdk/cpp/client/ydb_query/ya.make b/ydb/public/sdk/cpp/client/ydb_query/ya.make index 04cf2248245..12ee318af27 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_query/ya.make @@ -12,8 +12,14 @@ SRCS( ) PEERDIR( - ydb/public/sdk/cpp/client/ydb_query/impl + ydb/public/sdk/cpp/client/impl/ydb_internal/make_request + ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common + ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool + ydb/public/sdk/cpp/client/impl/ydb_internal/retry ydb/public/sdk/cpp/client/ydb_common_client + ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_query/impl + ydb/public/sdk/cpp/client/ydb_result ydb/public/sdk/cpp/client/ydb_types/operation ) diff --git a/ydb/public/sdk/cpp/client/ydb_retry/retry.h b/ydb/public/sdk/cpp/client/ydb_retry/retry.h index 7a6cdb61e95..d879f00e123 100644 --- a/ydb/public/sdk/cpp/client/ydb_retry/retry.h +++ b/ydb/public/sdk/cpp/client/ydb_retry/retry.h @@ -39,4 +39,4 @@ struct TRetryOperationSettings { } }; -}; +} // namespace NYdb::NRetry diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 8360a6228b6..871ce90bebc 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -1147,5 +1147,13 @@ TMaybe<TString> TTableClient::TImpl::GetQueryText(const TDataQuery& queryData) { return queryData.GetText(); } +void TTableClient::TImpl::CollectRetryStatAsync(EStatus status) { + RetryOperationStatCollector.IncAsyncRetryOperation(status); } + +void TTableClient::TImpl::CollectRetryStatSync(EStatus status) { + RetryOperationStatCollector.IncSyncRetryOperation(status); } + +} // namespace NTable +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index a3dd1de2f6b..8de98455558 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -144,6 +144,9 @@ public: TAsyncScanQueryPartIterator StreamExecuteScanQuery(const TString& query, const ::google::protobuf::Map<TString, Ydb::TypedValue>* params, const TStreamExecScanQuerySettings& settings); + void CollectRetryStatAsync(EStatus status); + void CollectRetryStatSync(EStatus status); + public: TClientSettings Settings_; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 0e466de52fe..07d926857e0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -5,6 +5,8 @@ #include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_async.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/retry/retry_sync.h> #undef INCLUDE_YDB_INTERNAL_H #include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> @@ -34,6 +36,13 @@ namespace NTable { using namespace NThreading; using namespace NSessionPool; +using TRetryContext = NRetry::TRetryContextAsync<TTableClient, TStatus>; +using TRetryWithSession = NRetry::TRetryWithSessionAsync<TTableClient, TTableClient::TOperationFunc, TStatus>; +using TRetryWithoutSession = NRetry::TRetryWithoutSessionAsync<TTableClient, TTableClient::TOperationWithoutSessionFunc, TStatus>; + +using TRetryWithSessionSync = NRetry::TRetryWithSessionSync<TTableClient, TTableClient::TOperationSyncFunc>; +using TRetryWithoutSessionSync = NRetry::TRetryWithoutSessionSync<TTableClient, TTableClient::TOperationWithoutSessionSyncFunc>; + //////////////////////////////////////////////////////////////////////////////// class TStorageSettings::TImpl { @@ -1374,307 +1383,26 @@ TTypeBuilder TTableClient::GetTypeBuilder() { //////////////////////////////////////////////////////////////////////////////// -struct TRetryState { - using TAsyncFunc = std::function<TAsyncStatus()>; - using THandleStatusFunc = std::function<TAsyncStatus(const std::shared_ptr<TRetryState>& state, - const TStatus& status, const TAsyncFunc& func, ui32 retryNumber)>; - - TMaybe<TSession> Session; - THandleStatusFunc HandleStatusFunc; -}; - -class TRetryOperationContext : public TThrRefBase, TNonCopyable { -public: - using TRetryContextPtr = TIntrusivePtr<TRetryOperationContext>; - -protected: - TRetryOperationSettings Settings; - TTableClient TableClient; - NThreading::TPromise<TStatus> Promise; - ui32 RetryNumber; - -public: - virtual void Execute() = 0; - - TAsyncStatus GetFuture() { - return Promise.GetFuture(); - } - -protected: - TRetryOperationContext(const TRetryOperationSettings& settings, - const TTableClient& tableClient) - : Settings(settings) - , TableClient(tableClient) - , Promise(NThreading::NewPromise<TStatus>()) - , RetryNumber(0) - {} - - static void RunOp(TRetryContextPtr self) { - self->Execute(); - } - - virtual void Reset() {} - - static void DoRetry(TRetryContextPtr self, bool fast) { - AsyncBackoff(self->TableClient.Impl_, - fast ? self->Settings.FastBackoffSettings_ : self->Settings.SlowBackoffSettings_, - self->RetryNumber, - [self]() { - RunOp(self); - } - ); - } - - static void HandleStatus(TRetryContextPtr self, const TStatus& status) { - if (status.IsSuccess()) { - return self->Promise.SetValue(status); - } - - if (self->RetryNumber >= self->Settings.MaxRetries_) { - return self->Promise.SetValue(status); - } - if (self->Settings.Verbose_) { - Cerr << "Previous query attempt was finished with unsuccessful status: " - << status.GetIssues().ToString() << ". Status is " << status.GetStatus() << "." - << "Send retry attempt " << self->RetryNumber << " of " << self->Settings.MaxRetries_ << Endl; - } - self->RetryNumber++; - self->TableClient.Impl_->RetryOperationStatCollector.IncAsyncRetryOperation(status.GetStatus()); - - switch (status.GetStatus()) { - case EStatus::ABORTED: - return RunOp(self); - - case EStatus::OVERLOADED: - case EStatus::CLIENT_RESOURCE_EXHAUSTED: - return DoRetry(self, false); - - case EStatus::UNAVAILABLE: - return DoRetry(self, true); - - case EStatus::BAD_SESSION: - case EStatus::SESSION_BUSY: - self->Reset(); - return RunOp(self); - - case EStatus::NOT_FOUND: - return self->Settings.RetryNotFound_ - ? RunOp(self) - : self->Promise.SetValue(status); - - case EStatus::UNDETERMINED: - return self->Settings.Idempotent_ - ? DoRetry(self, true) - : self->Promise.SetValue(status); - - case EStatus::TRANSPORT_UNAVAILABLE: - if (self->Settings.Idempotent_) { - self->Reset(); - return DoRetry(self, true); - } else { - return self->Promise.SetValue(status); - } - - default: - return self->Promise.SetValue(status); - } - } - - static void HandleException(TRetryContextPtr self, std::exception_ptr e) { - self->Promise.SetException(e); - } -}; - -class TRetryOperationWithSession : public TRetryOperationContext { - using TFunc = TTableClient::TOperationFunc; - - TFunc Operation; - TMaybe<TSession> Session; - -public: - explicit TRetryOperationWithSession(TFunc&& operation, - const TRetryOperationSettings& settings, - const TTableClient& tableClient) - : TRetryOperationContext(settings, tableClient) - , Operation(operation) - {} - - void Execute() override { - TRetryContextPtr self(this); - if (!Session) { - TableClient.GetSession( - TCreateSessionSettings().ClientTimeout(Settings.GetSessionClientTimeout_)).Subscribe( - [self](const TAsyncCreateSessionResult& resultFuture) { - try { - auto& result = resultFuture.GetValue(); - if (!result.IsSuccess()) { - return HandleStatus(self, result); - } - - auto* myself = dynamic_cast<TRetryOperationWithSession*>(self.Get()); - myself->Session = result.GetSession(); - myself->DoRunOp(self); - } catch (...) { - return HandleException(self, std::current_exception()); - } - }); - } else { - DoRunOp(self); - } - } - -private: - void Reset() override { - Session.Clear(); - } - - void DoRunOp(TRetryContextPtr self) { - Operation(Session.GetRef()).Subscribe([self](const TAsyncStatus& result) { - try { - return HandleStatus(self, result.GetValue()); - } catch (...) { - return HandleException(self, std::current_exception()); - } - }); - } -}; - TAsyncStatus TTableClient::RetryOperation(TOperationFunc&& operation, const TRetryOperationSettings& settings) { - TRetryOperationContext::TRetryContextPtr ctx(new TRetryOperationWithSession(std::move(operation), settings, *this)); + TRetryContext::TPtr ctx(new TRetryWithSession(*this, std::move(operation), settings)); ctx->Execute(); return ctx->GetFuture(); } -class TRetryOperationWithoutSession : public TRetryOperationContext { - using TFunc = TTableClient::TOperationWithoutSessionFunc; - - TFunc Operation; - -public: - explicit TRetryOperationWithoutSession(TFunc&& operation, - const TRetryOperationSettings& settings, - const TTableClient& tableClient) - : TRetryOperationContext(settings, tableClient) - , Operation(operation) - {} - - void Execute() override { - TRetryContextPtr self(this); - Operation(TableClient).Subscribe([self](const TAsyncStatus& result) { - try { - return HandleStatus(self, result.GetValue()); - } catch (...) { - return HandleException(self, std::current_exception()); - } - }); - } -}; - TAsyncStatus TTableClient::RetryOperation(TOperationWithoutSessionFunc&& operation, const TRetryOperationSettings& settings) { - TRetryOperationContext::TRetryContextPtr ctx(new TRetryOperationWithoutSession(std::move(operation), settings, *this)); + TRetryContext::TPtr ctx(new TRetryWithoutSession(*this, std::move(operation), settings)); ctx->Execute(); return ctx->GetFuture(); } -TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings) { - TRetryState retryState; - TMaybe<NYdb::TStatus> status; - - for (ui32 retryNumber = 0; retryNumber <= settings.MaxRetries_; ++retryNumber) { - status = operationWrapper(retryState); - - if (status->IsSuccess()) { - return *status; - } - - if (retryNumber == settings.MaxRetries_) { - break; - } - - switch (status->GetStatus()) { - case EStatus::ABORTED: - break; - - case EStatus::OVERLOADED: - case EStatus::CLIENT_RESOURCE_EXHAUSTED: { - Backoff(settings.SlowBackoffSettings_, retryNumber); - break; - } - - case EStatus::UNAVAILABLE:{ - Backoff(settings.FastBackoffSettings_, retryNumber); - break; - } - - case EStatus::BAD_SESSION: - case EStatus::SESSION_BUSY: - retryState.Session.Clear(); - break; - - case EStatus::NOT_FOUND: - if (!settings.RetryNotFound_) { - return *status; - } - break; - - case EStatus::UNDETERMINED: - if (!settings.Idempotent_) { - return *status; - } - Backoff(settings.FastBackoffSettings_, retryNumber); - break; - - case EStatus::TRANSPORT_UNAVAILABLE: - if (!settings.Idempotent_) { - return *status; - } - retryState.Session.Clear(); - Backoff(settings.FastBackoffSettings_, retryNumber); - break; - - default: - return *status; - } - Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus()); - } - - return *status; -} - TStatus TTableClient::RetryOperationSync(const TOperationWithoutSessionSyncFunc& operation, const TRetryOperationSettings& settings) { - auto operationWrapper = [this, &operation] (TRetryState&) { - return operation(*this); - }; - - return RetryOperationSyncHelper(operationWrapper, settings); + TRetryWithoutSessionSync ctx(*this, operation, settings); + return ctx.Execute(); } TStatus TTableClient::RetryOperationSync(const TOperationSyncFunc& operation, const TRetryOperationSettings& settings) { - TRetryState retryState; - - auto operationWrapper = [this, &operation, &settings] (TRetryState& retryState) { - TMaybe<NYdb::TStatus> status; - - if (!retryState.Session) { - auto sessionResult = Impl_->GetSession( - TCreateSessionSettings().ClientTimeout(settings.GetSessionClientTimeout_)).GetValueSync(); - if (sessionResult.IsSuccess()) { - retryState.Session = sessionResult.GetSession(); - } - status = sessionResult; - } - - if (retryState.Session) { - status = operation(retryState.Session.GetRef()); - if (status->IsSuccess()) { - return *status; - } - } - - return *status; - }; - - return RetryOperationSyncHelper(operationWrapper, settings); + TRetryWithSessionSync ctx(*this, operation, settings); + return ctx.Execute(); } NThreading::TFuture<void> TTableClient::Stop() { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 4892a3a32b9..435c5716bb0 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -5,10 +5,10 @@ #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_table/query_stats/stats.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> -#include <ydb/public/sdk/cpp/client/ydb_retry/retry.h> #include <util/generic/hash.h> #include <util/generic/maybe.h> @@ -30,14 +30,22 @@ class TableIndex; class TableIndexDescription; class ValueSinceUnixEpochModeSettings; -} -} +} // namespace Table +} // namespace Ydb namespace NYdb { +namespace NRetry { +template <typename TClient, typename TStatusType> +class TRetryContextAsync; + +template <typename> +class TRetryContextSync; +} // namespace NRetry + namespace NScheme { struct TPermissions; -} +} // namespace NScheme namespace NTable { @@ -960,7 +968,6 @@ struct TStreamExecScanQuerySettings : public TRequestSettings<TStreamExecScanQue class TSession; class TSessionPool; -struct TRetryState; enum class EDataFormat { ApacheArrow = 1, @@ -971,7 +978,8 @@ class TTableClient { friend class TSession; friend class TTransaction; friend class TSessionPool; - friend class TRetryOperationContext; + friend class NRetry::TRetryContextAsync<TTableClient, TStatus>; + friend class NRetry::TRetryContextSync<TTableClient>; public: using TOperationFunc = std::function<TAsyncStatus(TSession session)>; @@ -980,6 +988,8 @@ public: using TOperationWithoutSessionSyncFunc = std::function<TStatus(TTableClient& tableClient)>; using TSettings = TClientSettings; using TSession = TSession; + using TCreateSessionSettings = TCreateSessionSettings; + using TAsyncCreateSessionResult = TAsyncCreateSessionResult; public: TTableClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); @@ -1060,10 +1070,6 @@ public: const TStreamExecScanQuerySettings& settings = TStreamExecScanQuerySettings()); private: - using TOperationWrapperSyncFunc = std::function<TStatus(TRetryState& retryState)>; - TStatus RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings); - -private: class TImpl; std::shared_ptr<TImpl> Impl_; }; diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 886aa50d6f0..3b4bf878f19 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -2007,7 +2007,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015 { size_t retryNumber = 0; TVector<TResultSet> resultSets; - auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) { + auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) -> TAsyncStatus { // iterate over all providen statuses and return TStatus to emulate error if (retryNumber < retriableStatuses.size()) { TStatus status(retriableStatuses[retryNumber++], {}); @@ -2018,12 +2018,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015 resultSets = queryStatus.GetResultSets(); return NThreading::MakeFuture<TStatus>(queryStatus); }; - auto operationWithoutSession = [&operation] (TTableClient client) { + auto operationWithoutSession = [&operation] (TTableClient client) -> TAsyncStatus { auto session = client.CreateSession().GetValueSync().GetSession(); return operation(session); }; - const auto retrySettings = settings.MaxRetries(retriableStatuses.size() + 1).Verbose(true); + const auto retrySettings = settings.MaxRetries(retriableStatuses.size()).Verbose(true); auto result = client.RetryOperation(operation, retrySettings); CheckRetryResult(result.GetValueSync(), resultSets, expectSuccess); @@ -2038,7 +2038,7 @@ R"___(<main>: Error: Transaction not found: , code: 2015 { size_t retryNumber = 0; TVector<TResultSet> resultSets; - auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) { + auto operation = [&retryNumber, &resultSets, &retriableStatuses] (TSession session) -> TStatus { // iterate over all providen statuses and return TStatus to emulate error if (retryNumber < retriableStatuses.size()) { TStatus status(retriableStatuses[retryNumber++], {}); @@ -2049,12 +2049,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015 resultSets = queryStatus.GetResultSets(); return TStatus(queryStatus); }; - auto operationWithoutSession = [&operation] (TTableClient client) { + auto operationWithoutSession = [&operation] (TTableClient client) -> TStatus { auto session = client.CreateSession().GetValueSync().GetSession(); return operation(session); }; - const auto retrySettings = settings.MaxRetries(retriableStatuses.size() + 1).Verbose(true); + const auto retrySettings = settings.MaxRetries(retriableStatuses.size()).Verbose(true); auto result = client.RetryOperationSync(operation, retrySettings); CheckRetryResult(result, resultSets, expectSuccess); @@ -2078,10 +2078,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015 NYdb::NTable::TTableClient client(driver); TestRetryOperationAsync(client, GetRetriableAlwaysStatuses(), true); - TestRetryOperationAsync(client, GetRetriableOnOptionStatuses(), false); TestRetryOperationAsync(client, {EStatus::NOT_FOUND}, true, TRetryOperationSettings().RetryNotFound(true)); TestRetryOperationAsync(client, {EStatus::UNDETERMINED}, true, TRetryOperationSettings().Idempotent(true)); TestRetryOperationAsync(client, {EStatus::TRANSPORT_UNAVAILABLE}, true, TRetryOperationSettings().Idempotent(true)); + TestRetryOperationAsync(client, {EStatus::NOT_FOUND}, false, TRetryOperationSettings().RetryNotFound(false)); + TestRetryOperationAsync(client, {EStatus::UNDETERMINED}, false, TRetryOperationSettings().Idempotent(false)); + TestRetryOperationAsync(client, {EStatus::TRANSPORT_UNAVAILABLE}, false, TRetryOperationSettings().Idempotent(false)); driver.Stop(true); } @@ -2092,10 +2094,12 @@ R"___(<main>: Error: Transaction not found: , code: 2015 NYdb::NTable::TTableClient client(driver); TestRetryOperationSync(client, GetRetriableAlwaysStatuses(), true); - TestRetryOperationSync(client, GetRetriableOnOptionStatuses(), false); TestRetryOperationSync(client, {EStatus::NOT_FOUND}, true, TRetryOperationSettings().RetryNotFound(true)); TestRetryOperationSync(client, {EStatus::UNDETERMINED}, true, TRetryOperationSettings().Idempotent(true)); TestRetryOperationSync(client, {EStatus::TRANSPORT_UNAVAILABLE}, true, TRetryOperationSettings().Idempotent(true)); + TestRetryOperationSync(client, {EStatus::NOT_FOUND}, false, TRetryOperationSettings().RetryNotFound(false)); + TestRetryOperationSync(client, {EStatus::UNDETERMINED}, false, TRetryOperationSettings().Idempotent(false)); + TestRetryOperationSync(client, {EStatus::TRANSPORT_UNAVAILABLE}, false, TRetryOperationSettings().Idempotent(false)); driver.Stop(true); } |