aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-08-03 09:58:10 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-08-03 09:58:10 +0300
commitd9cb06d1157433d746be7c6572c97efdf1fc467b (patch)
tree5dd1f4ef1ffe46c173e0568ce9ef2382bda02a77
parentd37d53fe7cc46f079193ea62fc3c5f9f83d4c6ca (diff)
downloadydb-d9cb06d1157433d746be7c6572c97efdf1fc467b.tar.gz
UsePreferredEndpointStrictly
Connect only to an explicitly specified point endpoint.
-rw-r--r--ydb/public/lib/experimental/ydb_clickhouse_internal.cpp14
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h24
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h16
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_operation/operation.cpp6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp36
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h4
10 files changed, 57 insertions, 76 deletions
diff --git a/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp b/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp
index e830941e29..8e55f8119c 100644
--- a/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp
+++ b/ydb/public/lib/experimental/ydb_clickhouse_internal.cpp
@@ -103,13 +103,13 @@ public:
};
Connections_->RunDeferred<Ydb::ClickhouseInternal::V1::ClickhouseInternalService, Ydb::ClickhouseInternal::ScanRequest, Ydb::ClickhouseInternal::ScanResponse>(
- std::move(request),
- extractor,
- &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::Stub::AsyncScan,
- DbDriverState_,
- INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- TEndpointKey(settings.Endpoint_, 0));
+ std::move(request),
+ extractor,
+ &Ydb::ClickhouseInternal::V1::ClickhouseInternalService::Stub::AsyncScan,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings, TEndpointKey(settings.Endpoint_, 0))
+ );
return promise.GetFuture();
}
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp
index d44f288da2..08fbb69693 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/actions.cpp
@@ -44,17 +44,19 @@ void TDeferredAction::OnAlarm() {
Ydb::Operations::GetOperationRequest getOperationRequest;
getOperationRequest.set_id(OperationId_);
+ TRpcRequestSettings settings;
+ settings.PreferredEndpoint = TEndpointKey(Endpoint_, 0);
+
Connection_->RunDeferred<Ydb::Operation::V1::OperationService, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse>(
std::move(getOperationRequest),
std::move(UserResponseCb_),
&Ydb::Operation::V1::OperationService::Stub::AsyncGetOperation,
DbDriverState_,
NextDelay_,
- {},
+ settings,
true,
- TEndpointKey(Endpoint_, 0),
std::move(Context_));
-}
+ }
void TDeferredAction::OnError() {
Y_VERIFY(Connection_);
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
index 40fa6b39f8..19850c8318 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
@@ -102,7 +102,7 @@ public:
{
SetGrpcKeepAlive(clientConfig, GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY, GRpcKeepAlivePermitWithoutCalls_);
} else {
- auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint);
+ auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint, endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UsePreferredEndpointStrictly);
if (!endpoint) {
return {nullptr, TEndpointKey()};
}
@@ -139,7 +139,6 @@ public:
TSimpleRpc<TService, TRequest, TResponse> rpc,
TDbDriverStatePtr dbState,
const TRpcRequestSettings& requestSettings,
- const TEndpointKey& preferredEndpoint,
std::shared_ptr<IQueueClientContext> context = nullptr)
{
using NGrpc::TGrpcStatus;
@@ -166,8 +165,6 @@ public:
});
}
- auto endpointPolicy = requestSettings.EndpointPolicy;
-
WithServiceConnection<TService>(
[this, request = std::move(request), userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState]
(TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void {
@@ -268,7 +265,7 @@ public:
serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta,
context.get());
- }, dbState, preferredEndpoint, endpointPolicy);
+ }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy);
}
template<typename TService, typename TRequest, typename TResponse>
@@ -280,7 +277,6 @@ public:
TDuration deferredTimeout,
const TRpcRequestSettings& requestSettings,
bool poll = false,
- const TEndpointKey& preferredEndpoint = TEndpointKey(),
std::shared_ptr<IQueueClientContext> context = nullptr)
{
if (!TryCreateContext(context)) {
@@ -322,7 +318,6 @@ public:
rpc,
dbState,
requestSettings,
- preferredEndpoint,
std::move(context));
}
@@ -349,7 +344,6 @@ public:
rpc,
dbState,
requestSettings,
- TEndpointKey(),
nullptr);
}
@@ -361,7 +355,6 @@ public:
TDbDriverStatePtr dbState,
TDuration deferredTimeout,
const TRpcRequestSettings& requestSettings,
- const TEndpointKey& preferredEndpoint = TEndpointKey(),
std::shared_ptr<IQueueClientContext> context = nullptr)
{
auto operationCb = [userResponseCb = std::move(userResponseCb)](Ydb::Operations::Operation* operation, TPlainStatus status) mutable {
@@ -381,7 +374,6 @@ public:
deferredTimeout,
requestSettings,
true, // poll
- preferredEndpoint,
context);
}
@@ -399,7 +391,6 @@ public:
TStreamRpc<TService, TRequest, TResponse, NGrpc::TStreamRequestReadProcessor> rpc,
TDbDriverStatePtr dbState,
const TRpcRequestSettings& requestSettings,
- const TEndpointKey& preferredEndpoint = TEndpointKey(),
std::shared_ptr<IQueueClientContext> context = nullptr)
{
using NGrpc::TGrpcStatus;
@@ -411,11 +402,8 @@ public:
return;
}
- auto endpointPolicy = requestSettings.EndpointPolicy;
-
WithServiceConnection<TService>(
- [request, responseCb = std::move(responseCb), rpc, requestSettings, context = std::move(context), dbState]
- (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable {
+ [request, responseCb = std::move(responseCb), rpc, requestSettings, context = std::move(context), dbState](TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable {
if (!status.Ok()) {
responseCb(std::move(status), nullptr);
return;
@@ -490,7 +478,7 @@ public:
std::move(rpc),
std::move(meta),
context.get());
- }, dbState, preferredEndpoint, endpointPolicy);
+ }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy);
}
template<class TService, class TRequest, class TResponse, class TCallback>
@@ -510,8 +498,6 @@ public:
return;
}
- auto endpointPolicy = requestSettings.EndpointPolicy;
-
WithServiceConnection<TService>(
[connectedCallback = std::move(connectedCallback), rpc, requestSettings, context = std::move(context), dbState]
(TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable {
@@ -587,7 +573,7 @@ public:
std::move(rpc),
std::move(meta),
context.get());
- }, dbState, TEndpointKey(), endpointPolicy);
+ }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy);
}
TAsyncListEndpointsResult GetEndpoints(TDbDriverStatePtr dbState) override;
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h
index 9ad9525572..17d0edf089 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/rpc_request_settings/settings.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/internal_header.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h>
@@ -9,20 +10,23 @@ struct TRpcRequestSettings {
TStringType TraceId;
TStringType RequestType;
std::vector<std::pair<TStringType, TStringType>> Header;
+ TEndpointKey PreferredEndpoint = {};
enum class TEndpointPolicy {
- UsePreferredEndpoint,
- UseDiscoveryEndpoint // Use single discovery endpoint for request
- } EndpointPolicy = TEndpointPolicy::UsePreferredEndpoint;
+ UsePreferredEndpointOptionally, // Try to use the preferred endpoint
+ UsePreferredEndpointStrictly, // Use only the preferred endpoint
+ UseDiscoveryEndpoint // Use single discovery endpoint
+ } EndpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally;
bool UseAuth = true;
TDuration ClientTimeout;
- template<typename TRequestSettings>
- static TRpcRequestSettings Make(const TRequestSettings& settings) {
+ template <typename TRequestSettings>
+ static TRpcRequestSettings Make(const TRequestSettings& settings, const TEndpointKey& preferredEndpoint = {}, TEndpointPolicy endpointPolicy = TEndpointPolicy::UsePreferredEndpointOptionally) {
TRpcRequestSettings rpcSettings;
rpcSettings.TraceId = settings.TraceId_;
rpcSettings.RequestType = settings.RequestType_;
rpcSettings.Header = settings.Header_;
- rpcSettings.EndpointPolicy = TEndpointPolicy::UsePreferredEndpoint;
+ rpcSettings.PreferredEndpoint = preferredEndpoint;
+ rpcSettings.EndpointPolicy = endpointPolicy;
rpcSettings.UseAuth = true;
rpcSettings.ClientTimeout = settings.ClientTimeout_;
return rpcSettings;
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 914ec96711..b8d12d7371 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.6.0 \ No newline at end of file
+2.6.1 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h b/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h
index 71fbb7c25c..18d65505bc 100644
--- a/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h
+++ b/ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h
@@ -59,8 +59,7 @@ protected:
NThreading::TFuture<TStatus> RunSimple(
TRequest&& request,
TAsyncRequest<TService, TRequest, TResponse> rpc,
- const TRpcRequestSettings& requestSettings = {},
- const TEndpointKey& preferredEndpoint = TEndpointKey())
+ const TRpcRequestSettings& requestSettings = {})
{
auto promise = NThreading::NewPromise<TStatus>();
@@ -76,8 +75,7 @@ protected:
rpc,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- requestSettings,
- preferredEndpoint);
+ requestSettings);
return promise.GetFuture();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp b/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp
index d30774243b..66f0439c6e 100644
--- a/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_operation/operation.cpp
@@ -52,8 +52,7 @@ class TOperationClient::TImpl : public TClientImplCommon<TOperationClient::TImpl
extractor,
rpc,
DbDriverState_,
- rpcSettings,
- TEndpointKey());
+ rpcSettings);
return promise.GetFuture();
}
@@ -116,8 +115,7 @@ public:
extractor,
&V1::OperationService::Stub::AsyncListOperations,
DbDriverState_,
- rpcSettings,
- TEndpointKey());
+ rpcSettings);
return promise.GetFuture();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index 2ca4d63357..12ea4166dc 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -79,8 +79,7 @@ public:
responseCb,
&V1::QueryService::Stub::AsyncExecuteScript,
DbDriverState_,
- TRpcRequestSettings::Make(settings),
- TEndpointKey());
+ TRpcRequestSettings::Make(settings));
return promise.GetFuture();
}
@@ -134,8 +133,7 @@ public:
extractor,
&V1::QueryService::Stub::AsyncFetchScriptResults,
DbDriverState_,
- rpcSettings,
- TEndpointKey());
+ rpcSettings);
return promise.GetFuture();
}
@@ -185,12 +183,11 @@ public:
const auto sessionId = resp->session_id();
request.set_session_id(sessionId);
- const auto endpointKey = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId));
-
auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client);
// Do not pass client timeout here. Session must be alive
- static const TRpcRequestSettings rpcSettings;
+ TRpcRequestSettings rpcSettings;
+ rpcSettings.PreferredEndpoint = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId));
Connections_->StartReadStream<
Ydb::Query::V1::QueryService,
@@ -208,8 +205,7 @@ public:
},
&Ydb::Query::V1::QueryService::Stub::AsyncAttachSession,
DbDriverState_,
- rpcSettings,
- endpointKey);
+ rpcSettings);
}
TAsyncCreateSessionResult CreateAttachedSession(TDuration timeout) {
@@ -245,8 +241,7 @@ public:
extractor,
&V1::QueryService::Stub::AsyncCreateSession,
DbDriverState_,
- rpcSettings,
- TEndpointKey());
+ rpcSettings);
return promise.GetFuture();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 478f927f6a..87fd1462f9 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -394,7 +394,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio
auto createSessionPromise = NewPromise<TCreateSessionResult>();
auto self = shared_from_this();
- auto rpcSettings = TRpcRequestSettings::Make(settings);
+ auto rpcSettings = TRpcRequestSettings::Make(settings, TEndpointKey(preferredLocation, 0));
rpcSettings.Header.push_back({NYdb::YDB_CLIENT_CAPABILITIES, NYdb::YDB_CLIENT_CAPABILITY_SESSION_BALANCER});
auto createSessionExtractor = [createSessionPromise, self, standalone]
@@ -423,8 +423,7 @@ TAsyncCreateSessionResult TTableClient::TImpl::CreateSession(const TCreateSessio
&Ydb::Table::V1::TableService::Stub::AsyncCreateSession,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- rpcSettings,
- TEndpointKey(preferredLocation, 0));
+ rpcSettings);
std::weak_ptr<TDbDriverState> state = DbDriverState_;
@@ -466,8 +465,8 @@ TAsyncKeepAliveResult TTableClient::TImpl::KeepAlive(const TSession::TImpl* sess
&Ydb::Table::V1::TableService::Stub::AsyncKeepAlive,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session->GetEndpointKey())
+ );
return keepAliveResultPromise.GetFuture();
}
@@ -619,8 +618,8 @@ TAsyncPrepareQueryResult TTableClient::TImpl::PrepareDataQuery(const TSession& s
&Ydb::Table::V1::TableService::Stub::AsyncPrepareDataQuery,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
return promise.GetFuture();
}
@@ -667,8 +666,8 @@ TAsyncBeginTransactionResult TTableClient::TImpl::BeginTransaction(const TSessio
&Ydb::Table::V1::TableService::Stub::AsyncBeginTransaction,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
return promise.GetFuture();
}
@@ -705,8 +704,8 @@ TAsyncCommitTransactionResult TTableClient::TImpl::CommitTransaction(const TSess
&Ydb::Table::V1::TableService::Stub::AsyncCommitTransaction,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
return promise.GetFuture();
}
@@ -721,8 +720,8 @@ TAsyncStatus TTableClient::TImpl::RollbackTransaction(const TSession& session, c
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse>(
std::move(request),
&Ydb::Table::V1::TableService::Stub::AsyncRollbackTransaction,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
}
TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSession& session, const TString& query,
@@ -755,8 +754,8 @@ TAsyncExplainDataQueryResult TTableClient::TImpl::ExplainDataQuery(const TSessio
&Ydb::Table::V1::TableService::Stub::AsyncExplainDataQuery,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
return promise.GetFuture();
}
@@ -846,8 +845,7 @@ TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&&
responseCb,
&Ydb::Table::V1::TableService::Stub::AsyncReadRows,
DbDriverState_,
- TRpcRequestSettings::Make(settings), // requestSettings
- TEndpointKey() // preferredEndpoint
+ TRpcRequestSettings::Make(settings)
);
return promise.GetFuture();
@@ -859,8 +857,8 @@ TAsyncStatus TTableClient::TImpl::Close(const TKqpSessionCommon* sessionImpl, co
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>(
std::move(request),
&Ydb::Table::V1::TableService::Stub::AsyncDeleteSession,
- TRpcRequestSettings::Make(settings),
- sessionImpl->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, sessionImpl->GetEndpointKey())
+ );
}
TAsyncStatus TTableClient::TImpl::CloseInternal(const TKqpSessionCommon* sessionImpl) {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index 875c740a6b..692cf85c33 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -262,8 +262,8 @@ private:
&Ydb::Table::V1::TableService::Stub::AsyncExecuteDataQuery,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
- TRpcRequestSettings::Make(settings),
- session.SessionImpl_->GetEndpointKey());
+ TRpcRequestSettings::Make(settings, session.SessionImpl_->GetEndpointKey())
+ );
return promise.GetFuture();
}