aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSlusarenko Igor <s2m1@ydb.tech>2025-03-26 14:37:15 +0300
committerGitHub <noreply@github.com>2025-03-26 14:37:15 +0300
commita53e9240d949b9f0e79a031d80d0e0e397791fdf (patch)
tree67d4a5d5aea59b5092fbaf14b5df47a2d60f34ac
parent22ccfe5f2fd4827ebdca8e5dbc821fbb3528a696 (diff)
downloadydb-a53e9240d949b9f0e79a031d80d0e0e397791fdf.tar.gz
Choose a connector based on a request's data source kind (#16052)
-rw-r--r--ydb/core/fq/libs/init/init.cpp3
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp2
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/client.cpp117
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/client.h2
-rw-r--r--ydb/library/yql/tools/dqrun/dqrun.cpp2
6 files changed, 93 insertions, 35 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 77ca538f485..8495e7ffe3e 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -198,8 +198,9 @@ void Init(
yqCounters->GetSubgroup("subcomponent", "http_gateway"));
NYql::NConnector::IClient::TPtr connectorClient = nullptr;
+
if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
- connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
+ connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric());
}
if (protoConfig.GetTokenAccessor().GetEnabled()) {
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
index 0690dc0a023..c3f5c6759fb 100644
--- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
@@ -107,7 +107,7 @@ namespace NKikimr::NKqp {
// Initialize Connector client
if (queryServiceConfig.HasGeneric()) {
GenericGatewaysConfig = queryServiceConfig.GetGeneric();
- ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig.GetConnector());
+ ConnectorClient = NYql::NConnector::MakeClientGRPC(GenericGatewaysConfig);
if (queryServiceConfig.HasMdbTransformHost()) {
MdbEndpointGenerator = NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost());
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 5a68bf34028..578f5c0f1e6 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1215,7 +1215,7 @@ namespace Tests {
if (queryServiceConfig.HasGeneric()) {
const auto& genericGatewayConfig = queryServiceConfig.GetGeneric();
- connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig.GetConnector());
+ connectorClient = NYql::NConnector::MakeClientGRPC(genericGatewayConfig);
auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
Runtime->RegisterService(
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
index b7ca8536782..18a111c3aa8 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp
@@ -18,6 +18,8 @@ namespace NYql::NConnector {
///
/// Connection Factory which always returns an exact connection
///
+ /// TODO: check connection healthiness
+ ///
template<typename T>
class TSingleConnectionFactory final : public IConnectionFactory<T> {
public:
@@ -159,6 +161,27 @@ namespace NYql::NConnector {
std::vector<std::shared_ptr<NYdbGrpc::TServiceConnection<T>>> Pool_;
};
+ ///
+ /// Create a factory based on a it's type @sa NYql::EConnectionFactory
+ ///
+ std::unique_ptr<IConnectionFactory<NApi::Connector>> CreateFactoryForConnector(
+ NYql::EConnectionFactory factory,
+ std::shared_ptr<NYdbGrpc::TGRpcClientLow> client,
+ const NYdbGrpc::TGRpcClientConfig& grpcConfig,
+ const NYdbGrpc::TTcpKeepAliveSettings& keepAlive) {
+
+ switch (factory) {
+ case SINGLE:
+ return std::make_unique<TSingleConnectionFactory<NApi::Connector>>(
+ client, grpcConfig, keepAlive);
+
+ case NEW_FOR_EACH_REQUEST:
+ default:
+ return std::make_unique<TNewOnEachRequestConnectionFactory<NApi::Connector>>(
+ client, grpcConfig, keepAlive);
+ }
+ }
+
/*
struct TConnectorMetrics {
@@ -208,24 +231,8 @@ namespace NYql::NConnector {
class TClientGRPC: public IClient {
public:
- TClientGRPC(const TGenericConnectorConfig& config) {
- // TODO: place in a config file ?
- GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);
- auto grpcConfig = ConnectorConfigToGrpcConfig(config);
-
- auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings {
- // TODO configure hardcoded values
- .Enabled = true,
- .Idle = 30,
- .Count = 5,
- .Interval = 10
- };
-
- // TODO:
- // 1. Add config parameter to TGenericConnectorConfig to choose factory
- // 2. Add support for multiple connector's config
- ConnectionFactory_ = std::make_unique<TNewOnEachRequestConnectionFactory<NApi::Connector>>(
- GrpcClient_, grpcConfig, keepAlive);
+ explicit TClientGRPC(const TGenericGatewayConfig& config) {
+ Init(config);
}
~TClientGRPC() {
@@ -300,13 +307,49 @@ namespace NYql::NConnector {
}
private:
- std::shared_ptr<NYdbGrpc::TServiceConnection<NApi::Connector>> GetConnection(const NYql::EGenericDataSourceKind&) {
- // TODO: choose appropriate connection factory by data source kind
- return ConnectionFactory_->Create();
+ void Init(const TGenericGatewayConfig& config) {
+ // TODO: place in a config file ?
+ GrpcClient_ = std::make_shared<NYdbGrpc::TGRpcClientLow>(DEFAULT_CONNECTION_MANAGER_NUM_THREADS);
+
+ auto keepAlive = NYdbGrpc::TTcpKeepAliveSettings {
+ // TODO: configure hardcoded values
+ .Enabled = true,
+ .Idle = 30,
+ .Count = 5,
+ .Interval = 10
+ };
+
+ size_t count = 0;
+ auto cfg = ConnectorConfigToGrpcConfig(
+ config.GetConnector(), count++);
+
+ DefaultFactory_ = CreateFactoryForConnector(
+ config.GetConnector().GetFactory(), GrpcClient_, cfg, keepAlive);
+
+ for (auto c : config.GetConnectors()) {
+ auto cfg = ConnectorConfigToGrpcConfig(c, count++);
+ std::shared_ptr<IConnectionFactory<NApi::Connector>> f
+ = CreateFactoryForConnector(c.GetFactory(), GrpcClient_, cfg, keepAlive);
+
+ for (auto k : c.GetForKinds()) {
+ if (!FactoryForKind_.try_emplace(NYql::EGenericDataSourceKind(k), f).second) {
+ throw yexception()
+ << "Duplicate connector is provided for the kind: "
+ << EGenericDataSourceKind_Name(k);
+ }
+ }
+ }
+ }
+
+ std::shared_ptr<NYdbGrpc::TServiceConnection<NApi::Connector>> GetConnection(NYql::EGenericDataSourceKind kind) const {
+ auto itr = FactoryForKind_.find(kind);
+
+ return FactoryForKind_.end() == itr ?
+ DefaultFactory_->Create() : itr->second->Create();
}
template<typename TResponse>
- TIteratorAsyncResult<IStreamIterator<TResponse>> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) {
+ TIteratorAsyncResult<IStreamIterator<TResponse>> DoEmptyStreamResponse(const grpc::StatusCode& code, TString msg) const {
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();
auto status = NYdbGrpc::TGrpcStatus(grpc::Status(code, msg));
@@ -335,7 +378,7 @@ namespace NYql::NConnector {
typename TRpcCallback = typename NYdbGrpc::TStreamRequestReadProcessor<NApi::Connector::Stub, TRequest, TResponse>::TAsyncRequest
>
TIteratorAsyncResult<IStreamIterator<TResponse>> ServerSideStreamingCall(
- const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) {
+ const NYql::EGenericDataSourceKind& kind, const TRequest& request, TRpcCallback rpc, TDuration timeout = {}) const {
auto promise = NThreading::NewPromise<TIteratorResult<IStreamIterator<TResponse>>>();
auto callback = [promise](NYdbGrpc::TGrpcStatus&& status, NYdbGrpc::IStreamRequestReadProcessor<TResponse>::TPtr streamProcessor) mutable {
@@ -359,16 +402,22 @@ namespace NYql::NConnector {
return promise.GetFuture();
}
- NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config) {
- auto cfg = NYdbGrpc::TGRpcClientConfig();
+ NYdbGrpc::TGRpcClientConfig ConnectorConfigToGrpcConfig(const TGenericConnectorConfig& config, size_t order) const {
+ auto cfg = NYdbGrpc::TGRpcClientConfig();
+
+ // Connector's name. If order equals to zero, it means that the config belongs "TGenericGatewayConfig.Connector"
+ // (default connector); otherwise, it is from "TGenericGatewayConfig.ConnectorS"
+ auto name = TStringBuilder()
+ << "Connector[" << (order == 0 ? TString("default") : TStringBuilder() << (order - 1)) << "]";
- Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in TGenericConnectorConfig: " << config.DebugString());
- Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in TGenericConnectorConfig: " << config.DebugString());
+ Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in " << name << ": " << config.DebugString());
+ Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in " << name << ": " << config.DebugString());
cfg.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port();
cfg.EnableSsl = config.GetUseSsl();
- YQL_CLOG(INFO, ProviderGeneric) << "Connector endpoint: " << (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator;
+ YQL_CLOG(INFO, ProviderGeneric) << name << " endpoint: "
+ << (config.GetUseSsl() ? "grpcs" : "grpc") << "://" << cfg.Locator;
// Read content of CA cert
TString rootCertData;
@@ -383,10 +432,18 @@ namespace NYql::NConnector {
private:
std::shared_ptr<NYdbGrpc::TGRpcClientLow> GrpcClient_;
- std::unique_ptr<IConnectionFactory<NApi::Connector>> ConnectionFactory_;
+ std::unique_ptr<IConnectionFactory<NApi::Connector>> DefaultFactory_;
+ std::unordered_map<NYql::EGenericDataSourceKind,
+ std::shared_ptr<IConnectionFactory<NApi::Connector>>> FactoryForKind_;
};
- IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg) {
+ IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg) {
+ if (!cfg.HasConnector()) {
+ throw yexception()
+ << "TGenericGatewayConfig.Connector is empty. "
+ << "In order to create a ClientGRPC it has to be set";
+ }
+
return std::make_shared<TClientGRPC>(cfg);
}
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h
index 929e6a6051c..aa91aa2a854 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/client.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h
@@ -173,5 +173,5 @@ namespace NYql::NConnector {
virtual ~IClient() = default;
};
- IClient::TPtr MakeClientGRPC(const NYql::TGenericConnectorConfig& cfg);
+ IClient::TPtr MakeClientGRPC(const ::NYql::TGenericGatewayConfig& cfg);
} // namespace NYql::NConnector
diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp
index 450c3a37437..31c2fe27886 100644
--- a/ydb/library/yql/tools/dqrun/dqrun.cpp
+++ b/ydb/library/yql/tools/dqrun/dqrun.cpp
@@ -1036,7 +1036,7 @@ int RunMain(int argc, const char* argv[])
clusters.emplace(to_lower(cluster.GetName()), TString{GenericProviderName});
}
- genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector());
+ genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric());
dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver, credentialsFactory));
}