diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-08-03 09:58:10 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-08-03 09:58:10 +0300 |
commit | d9cb06d1157433d746be7c6572c97efdf1fc467b (patch) | |
tree | 5dd1f4ef1ffe46c173e0568ce9ef2382bda02a77 | |
parent | d37d53fe7cc46f079193ea62fc3c5f9f83d4c6ca (diff) | |
download | ydb-d9cb06d1157433d746be7c6572c97efdf1fc467b.tar.gz |
UsePreferredEndpointStrictly
Connect only to an explicitly specified point endpoint.
10 files changed, 57 insertions, 76 deletions
diff --git a/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp b/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp index e830941e29..8e55f8119c 100644 --- a/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp +++ b/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp @@ -103,13 +103,13 @@ public: }; Connections_->RunDeferred<Ydb::ClickhouseInternal::V1::ClickhouseInternalService, Ydb::ClickhouseInternal::ScanRequest, Ydb::ClickhouseInternal::ScanResponse>( - std::move(request), - extractor, - &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::Stub::AsyncScan, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - TEndpointKey(settings.Endpoint_, 0)); + std::move(request), + extractor, + &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::Stub::AsyncScan, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings, TEndpointKey(settings.Endpoint_, 0)) + ); return promise.GetFuture(); } diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp index d44f288da2..08fbb69693 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp @@ -44,17 +44,19 @@ void TDeferredAction::OnAlarm() { Ydb::Operations::GetOperationRequest getOperationRequest; getOperationRequest.set_id(OperationId_); + TRpcRequestSettings settings; + settings.PreferredEndpoint = TEndpointKey(Endpoint_, 0); + Connection_->RunDeferred<Ydb::Operation::V1::OperationService, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse>( std::move(getOperationRequest), std::move(UserResponseCb_), &Ydb::Operation::V1::OperationService::Stub::AsyncGetOperation, DbDriverState_, NextDelay_, - {}, + settings, true, - TEndpointKey(Endpoint_, 0), std::move(Context_)); -} + } void TDeferredAction::OnError() { Y_VERIFY(Connection_); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h index 40fa6b39f8..19850c8318 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -102,7 +102,7 @@ public: { SetGrpcKeepAlive(clientConfig, GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY, GRpcKeepAlivePermitWithoutCalls_); } else { - auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint); + auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint, endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UsePreferredEndpointStrictly); if (!endpoint) { return {nullptr, TEndpointKey()}; } @@ -139,7 +139,6 @@ public: TSimpleRpc<TService, TRequest, TResponse> rpc, TDbDriverStatePtr dbState, const TRpcRequestSettings& requestSettings, - const TEndpointKey& preferredEndpoint, std::shared_ptr<IQueueClientContext> context = nullptr) { using NGrpc::TGrpcStatus; @@ -166,8 +165,6 @@ public: }); } - auto endpointPolicy = requestSettings.EndpointPolicy; - WithServiceConnection<TService>( [this, request = std::move(request), userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState] (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void { @@ -268,7 +265,7 @@ public: serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta, context.get()); - }, dbState, preferredEndpoint, endpointPolicy); + }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy); } template<typename TService, typename TRequest, typename TResponse> @@ -280,7 +277,6 @@ public: TDuration deferredTimeout, const TRpcRequestSettings& requestSettings, bool poll = false, - const TEndpointKey& preferredEndpoint = TEndpointKey(), std::shared_ptr<IQueueClientContext> context = nullptr) { if (!TryCreateContext(context)) { @@ -322,7 +318,6 @@ public: rpc, dbState, requestSettings, - preferredEndpoint, std::move(context)); } @@ -349,7 +344,6 @@ public: rpc, dbState, requestSettings, - TEndpointKey(), nullptr); } @@ -361,7 +355,6 @@ public: TDbDriverStatePtr dbState, TDuration deferredTimeout, const TRpcRequestSettings& requestSettings, - const TEndpointKey& preferredEndpoint = TEndpointKey(), std::shared_ptr<IQueueClientContext> context = nullptr) { auto operationCb = [userResponseCb = std::move(userResponseCb)](Ydb::Operations::Operation* operation, TPlainStatus status) mutable { @@ -381,7 +374,6 @@ public: deferredTimeout, requestSettings, true, // poll - preferredEndpoint, context); } @@ -399,7 +391,6 @@ public: TStreamRpc<TService, TRequest, TResponse, NGrpc::TStreamRequestReadProcessor> rpc, TDbDriverStatePtr dbState, const TRpcRequestSettings& requestSettings, - const TEndpointKey& preferredEndpoint = TEndpointKey(), std::shared_ptr<IQueueClientContext> context = nullptr) { using NGrpc::TGrpcStatus; @@ -411,11 +402,8 @@ public: return; } - auto endpointPolicy = requestSettings.EndpointPolicy; - WithServiceConnection<TService>( - [request, responseCb = std::move(responseCb), rpc, requestSettings, context = std::move(context), dbState] - (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable { + [request, responseCb = std::move(responseCb), rpc, requestSettings, context = std::move(context), dbState](TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable { if (!status.Ok()) { responseCb(std::move(status), nullptr); return; @@ -490,7 +478,7 @@ public: std::move(rpc), std::move(meta), context.get()); - }, dbState, preferredEndpoint, endpointPolicy); + }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy); } template<class TService, class TRequest, class TResponse, class TCallback> @@ -510,8 +498,6 @@ public: return; } - auto endpointPolicy = requestSettings.EndpointPolicy; - WithServiceConnection<TService>( [connectedCallback = std::move(connectedCallback), rpc, requestSettings, context = std::move(context), dbState] (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable { @@ -587,7 +573,7 @@ public: std::move(rpc), std::move(meta), context.get()); - }, dbState, TEndpointKey(), endpointPolicy); + }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy); } TAsyncListEndpointsResult GetEndpoints(TDbDriverStatePtr dbState) override; diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h index 9ad9525572..17d0edf089 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h> @@ -9,20 +10,23 @@ struct TRpcRequestSettings { TStringType TraceId; TStringType RequestType; std::vector<std::pair<TStringType, TStringType>> Header; + TEndpointKey PreferredEndpoint = {}; enum class TEndpointPolicy { - UsePreferredEndpoint, - UseDiscoveryEndpoint // Use single discovery endpoint for request - } EndpointPolicy = TEndpointPolicy::UsePreferredEndpoint; + UsePreferredEndpointOptionally, // Try to use the preferred endpoint + UsePreferredEndpointStrictly, // Use only the preferred endpoint + UseDiscoveryEndpoint // Use single discovery endpoint + } EndpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally; bool UseAuth = true; TDuration ClientTimeout; - template<typename TRequestSettings> - static TRpcRequestSettings Make(const TRequestSettings& settings) { + template <typename TRequestSettings> + static TRpcRequestSettings Make(const TRequestSettings& settings, const TEndpointKey& preferredEndpoint = {}, TEndpointPolicy endpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally) { TRpcRequestSettings rpcSettings; rpcSettings.TraceId = settings.TraceId_; rpcSettings.RequestType = settings.RequestType_; rpcSettings.Header = settings.Header_; - rpcSettings.EndpointPolicy = TEndpointPolicy::UsePreferredEndpoint; + rpcSettings.PreferredEndpoint = preferredEndpoint; + rpcSettings.EndpointPolicy = endpointPolicy; rpcSettings.UseAuth = true; rpcSettings.ClientTimeout = settings.ClientTimeout_; return rpcSettings; diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 914ec96711..b8d12d7371 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.6.0
\ No newline at end of file +2.6.1
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h b/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h index 71fbb7c25c..18d65505bc 100644 --- a/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h +++ b/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h @@ -59,8 +59,7 @@ protected: NThreading::TFuture<TStatus> RunSimple( TRequest&& request, TAsyncRequest<TService, TRequest, TResponse> rpc, - const TRpcRequestSettings& requestSettings = {}, - const TEndpointKey& preferredEndpoint = TEndpointKey()) + const TRpcRequestSettings& requestSettings = {}) { auto promise = NThreading::NewPromise<TStatus>(); @@ -76,8 +75,7 @@ protected: rpc, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - requestSettings, - preferredEndpoint); + requestSettings); return promise.GetFuture(); } diff --git a/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp b/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp index d30774243b..66f0439c6e 100644 --- a/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp +++ b/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp @@ -52,8 +52,7 @@ class TOperationClient::TImpl : public TClientImplCommon<TOperationClient::TImpl extractor, rpc, DbDriverState_, - rpcSettings, - TEndpointKey()); + rpcSettings); return promise.GetFuture(); } @@ -116,8 +115,7 @@ public: extractor, &V1::OperationService::Stub::AsyncListOperations, DbDriverState_, - rpcSettings, - TEndpointKey()); + rpcSettings); return promise.GetFuture(); } diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 2ca4d63357..12ea4166dc 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -79,8 +79,7 @@ public: responseCb, &V1::QueryService::Stub::AsyncExecuteScript, DbDriverState_, - TRpcRequestSettings::Make(settings), - TEndpointKey()); + TRpcRequestSettings::Make(settings)); return promise.GetFuture(); } @@ -134,8 +133,7 @@ public: extractor, &V1::QueryService::Stub::AsyncFetchScriptResults, DbDriverState_, - rpcSettings, - TEndpointKey()); + rpcSettings); return promise.GetFuture(); } @@ -185,12 +183,11 @@ public: const auto sessionId = resp->session_id(); request.set_session_id(sessionId); - const auto endpointKey = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId)); - auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client); // Do not pass client timeout here. Session must be alive - static const TRpcRequestSettings rpcSettings; + TRpcRequestSettings rpcSettings; + rpcSettings.PreferredEndpoint = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId)); Connections_->StartReadStream< Ydb::Query::V1::QueryService, @@ -208,8 +205,7 @@ public: }, &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession, DbDriverState_, - rpcSettings, - endpointKey); + rpcSettings); } TAsyncCreateSessionResult CreateAttachedSession(TDuration timeout) { @@ -245,8 +241,7 @@ public: extractor, &V1::QueryService::Stub::AsyncCreateSession, DbDriverState_, - rpcSettings, - TEndpointKey()); + rpcSettings); return promise.GetFuture(); } diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 478f927f6a..87fd1462f9 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -394,7 +394,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio auto createSessionPromise = NewPromise<TCreateSessionResult>(); auto self = shared_from_this(); - auto rpcSettings = TRpcRequestSettings::Make(settings); + auto rpcSettings = TRpcRequestSettings::Make(settings, TEndpointKey(preferredLocation, 0)); rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER}); auto createSessionExtractor = [createSessionPromise, self, standalone] @@ -423,8 +423,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio &Ydb::Table::V1::TableService::Stub::AsyncCreateSession, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - rpcSettings, - TEndpointKey(preferredLocation, 0)); + rpcSettings); std::weak_ptr<TDbDriverState> state = DbDriverState_; @@ -466,8 +465,8 @@ TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* sess &Ydb::Table::V1::TableService::Stub::AsyncKeepAlive, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session->GetEndpointKey()) + ); return keepAliveResultPromise.GetFuture(); } @@ -619,8 +618,8 @@ TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& s &Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); return promise.GetFuture(); } @@ -667,8 +666,8 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio &Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); return promise.GetFuture(); } @@ -705,8 +704,8 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess &Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); return promise.GetFuture(); } @@ -721,8 +720,8 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); } TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const TString& query, @@ -755,8 +754,8 @@ TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSessio &Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); return promise.GetFuture(); } @@ -846,8 +845,7 @@ TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&& responseCb, &Ydb::Table::V1::TableService::Stub::AsyncReadRows, DbDriverState_, - TRpcRequestSettings::Make(settings), // requestSettings - TEndpointKey() // preferredEndpoint + TRpcRequestSettings::Make(settings) ); return promise.GetFuture(); @@ -859,8 +857,8 @@ TAsyncStatus TTableClient::TImpl::Close(const TKqpSessionCommon* sessionImpl, co return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>( std::move(request), &Ydb::Table::V1::TableService::Stub::AsyncDeleteSession, - TRpcRequestSettings::Make(settings), - sessionImpl->GetEndpointKey()); + TRpcRequestSettings::Make(settings, sessionImpl->GetEndpointKey()) + ); } TAsyncStatus TTableClient::TImpl::CloseInternal(const TKqpSessionCommon* sessionImpl) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index 875c740a6b..692cf85c33 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -262,8 +262,8 @@ private: &Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - session.SessionImpl_->GetEndpointKey()); + TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey()) + ); return promise.GetFuture(); } |