diff options
| author | vitalyisaev <[email protected]> | 2023-06-14 18:14:01 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-06-14 18:14:01 +0300 |
| commit | 607f1bd3dab2c692f1b508929fde3bb5a89e3085 (patch) | |
| tree | 434da98b7633c746d71666c182f611851552f1d8 | |
| parent | 5f83c9f31701030a967c2a618034c562d73261b0 (diff) | |
Support PostgreSQL in Connector
1. Добавлена поддержка PostgreSQL.
2. На разных уровнях появились абстракции, обобщающие работу с реляционными источниками данных:
- в `gateways.conf `- обобщённое описание кластера к любому источнику;
- в коде Go Connector - пакет `rdbms` c общей логикой работы с РСУБД.
3. Из провайдера удалена ещё одна порция Clickhouse-cпецифичной логики.
14 files changed, 102 insertions, 145 deletions
diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt index bf15f57e3fa..0841a72daef 100644 --- a/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(providers-common-proto PUBLIC contrib-libs-cxxsupp yutil library-yql-protos + connector-api-protos contrib-libs-protobuf ) target_proto_messages(providers-common-proto PRIVATE diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt index f4fc466d366..8dade7c78c9 100644 --- a/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt @@ -37,6 +37,7 @@ target_link_libraries(providers-common-proto PUBLIC contrib-libs-cxxsupp yutil library-yql-protos + connector-api-protos contrib-libs-protobuf ) target_proto_messages(providers-common-proto PRIVATE diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt index f4fc466d366..8dade7c78c9 100644 --- a/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt @@ -37,6 +37,7 @@ target_link_libraries(providers-common-proto PUBLIC contrib-libs-cxxsupp yutil library-yql-protos + connector-api-protos contrib-libs-protobuf ) target_proto_messages(providers-common-proto PRIVATE diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt index bf15f57e3fa..0841a72daef 100644 --- a/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt @@ -36,6 +36,7 @@ target_link_libraries(providers-common-proto PUBLIC contrib-libs-cxxsupp yutil library-yql-protos + connector-api-protos contrib-libs-protobuf ) target_proto_messages(providers-common-proto PRIVATE diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 87d25c05217..4b223fb456b 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -2,6 +2,7 @@ package NYql; option java_package = "ru.yandex.yql.proto"; import "ydb/library/yql/protos/clickhouse.proto"; +import "ydb/library/yql/providers/generic/connector/api/protos/connector.proto"; /////////////////////////////// common /////////////////////////////// @@ -532,11 +533,29 @@ message TDbToolConfig { /////////// Generic gateway for the external data sources //////////// +message TGenericClusterConfig { + // Cluster name + required string Name = 1; + + // Data source kind + required NYql.Connector.API.DataSourceKind Kind = 2; + + // Location represents the network address of a data source instance we want to connect + oneof Location { + // Endpoint must be used for on-premise deployments. + NYql.Connector.API.Endpoint Endpoint = 3; + // DatabaseID must be used when the data source is deployed in cloud. + // Data source FQDN and port will be resolved by MDB service. + string DatabaseID = 4; + } + + required NYql.Connector.API.Credentials Credentials = 5; +} + message TGenericGatewayConfig { required string Endpoint = 1; - - // external data sources that can be accessed via connector - optional TClickHouseGatewayConfig ClickHouse = 2; + reserved 2; + repeated TGenericClusterConfig ClusterMapping = 3; } /////////////////////////////// Root /////////////////////////////// diff --git a/ydb/library/yql/providers/common/proto/ya.make b/ydb/library/yql/providers/common/proto/ya.make index b4480f31254..3c3ffac95d8 100644 --- a/ydb/library/yql/providers/common/proto/ya.make +++ b/ydb/library/yql/providers/common/proto/ya.make @@ -7,6 +7,7 @@ SRCS( PEERDIR( ydb/library/yql/protos + ydb/library/yql/providers/generic/connector/api/protos ) IF (NOT PY_PROTOS_FOR) diff --git a/ydb/library/yql/providers/generic/connector/api/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/protos/connector.proto index c09bf302c1b..760569b4d94 100644 --- a/ydb/library/yql/providers/generic/connector/api/protos/connector.proto +++ b/ydb/library/yql/providers/generic/connector/api/protos/connector.proto @@ -398,55 +398,40 @@ message Filter { // ---------- Utils ---------- -// DataSourceInstance helps to identify the instance of a data source to route request to. -message DataSourceInstance { - oneof location { - // Network address to connect to '<prefix>://<path>[:<port>]', for example: - // http://my_external_service:2023 - // grpc://my_external_service:2023 - string endpoint = 1; - - // Preconfigured installation name hiding network details. - // This installation must be described in the configuration of Connector service app: - // installations { - // { "postgres", {"https://postgres.yandexcloud.net", other_params} - // } - string installation = 2; - } - - // What database to connect - string database = 3; +// Describes the kind of external data source +enum DataSourceKind { + DATA_SOURCE_KIND_RESERVED = 0; + CLICKHOUSE = 1; + POSTGRESQL = 2; +} - message Credentials { - // Basic - message Basic { - string username = 1; - string password = 2; - } +message Endpoint { + string host = 1; + uint32 port = 2; +} - // Copied from: https://a.yandex-team.ru/arcadia/ydb/public/api/protos/draft/fq.proto?rev=r10844260#L419 - message NoneAuth {} +message Credentials { + // Basic + message Basic { + string username = 1; + string password = 2; + } - // Copied from: https://a.yandex-team.ru/arcadia/ydb/public/api/protos/draft/fq.proto?rev=r10844260#L426 - message IamAuth { - message CurrentIAMTokenAuth {} + oneof payload { + Basic basic = 1; + } +} - message ServiceAccountAuth { - string id = 1; - } +// DataSourceInstance helps to identify the instance of a data source to route request to. +message DataSourceInstance { + // Kind of the DataSource + DataSourceKind kind = 1; - oneof identity { - CurrentIAMTokenAuth current_iam = 1; - ServiceAccountAuth service_account = 2; - } - } + // Network address to connect + Endpoint endpoint = 2; - oneof payload { - Basic basic = 1; - IamAuth iam_auth = 2; - NoneAuth none = 3; - } - } + // What database to connect + string database = 3; // Information for authentication Credentials credentials = 4; diff --git a/ydb/library/yql/providers/generic/connector/api/protos/ya.make b/ydb/library/yql/providers/generic/connector/api/protos/ya.make index 2c92b6c4813..b6f247a92ec 100644 --- a/ydb/library/yql/providers/generic/connector/api/protos/ya.make +++ b/ydb/library/yql/providers/generic/connector/api/protos/ya.make @@ -4,7 +4,8 @@ PEERDIR( ydb/public/api/protos ) -ONLY_TAGS(CPP_PROTO PY3_PROTO) +# Because Go is excluded in YDB protofiles +EXCLUDE_TAGS(GO_PROTO) SRCS( connector.proto diff --git a/ydb/library/yql/providers/generic/connector/api/ya.make b/ydb/library/yql/providers/generic/connector/api/ya.make index 721fae90b0e..b255d99f9ad 100644 --- a/ydb/library/yql/providers/generic/connector/api/ya.make +++ b/ydb/library/yql/providers/generic/connector/api/ya.make @@ -6,7 +6,8 @@ SRCS( connector.proto ) -ONLY_TAGS(CPP_PROTO PY3_PROTO) +# Because Go is excluded in YDB protofiles +EXCLUDE_TAGS(GO_PROTO) PEERDIR( ydb/library/yql/providers/generic/connector/api/protos diff --git a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp index 04452483b02..7942485feb9 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp @@ -6,7 +6,8 @@ const TString TableName = "example_3"; void SetDatabaseSourceInstance(NYql::Connector::API::DataSourceInstance* dsi) { dsi->set_database("dqrun"); - dsi->set_endpoint("localhost:9000"); + dsi->mutable_endpoint()->set_host("localhost"); + dsi->mutable_endpoint()->set_port(9000); dsi->mutable_credentials()->mutable_basic()->set_username("crab"); dsi->mutable_credentials()->mutable_basic()->set_password("qwerty12345"); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 248d03bfde1..d5236eca168 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -96,12 +96,11 @@ namespace NYql { const auto& cluster = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue(); const auto& table = settings.Table().StringValue(); const auto& token = settings.Token().Name().StringValue(); - const auto& connect = State_->Configuration->Urls[cluster]; - const auto endpoint = connect.Endpoint(); + const auto& endpoint = State_->Configuration->ClusterConfigs[cluster].endpoint(); YQL_CLOG(INFO, ProviderGeneric) << "Filling source settings" - << ": cluster: " << cluster << ", table: " << table << ", endpoint: " << endpoint; + << ": cluster: " << cluster << ", table: " << table << ", endpoint: " << endpoint.DebugString(); Generic::TSource srcDesc; srcDesc.set_token(token); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index cca47d8c2e8..81c2aef829b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -49,13 +49,12 @@ namespace NYql { const TGenRead read(node); const auto cluster = read.DataSource().Cluster().StringValue(); YQL_CLOG(DEBUG, ProviderGeneric) << "Found cluster: " << cluster; - auto dbId = State_->Configuration->Endpoints[cluster].first; - dbId = dbId.substr(0, dbId.find(':')); - YQL_CLOG(DEBUG, ProviderGeneric) << "Found dbId: " << dbId; - const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Generic); + auto databaseID = State_->Configuration->ClusterConfigs[cluster].GetDatabaseID(); + YQL_CLOG(DEBUG, ProviderGeneric) << "Found cloudID: " << databaseID; + const auto idKey = std::make_pair(databaseID, NYql::DatabaseType::Generic); const auto iter = State_->DatabaseIds.find(idKey); if (iter != State_->DatabaseIds.end()) { - YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CH id: " << dbId; + YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CloudID: " << databaseID; ids[idKey] = iter->second; } } @@ -89,7 +88,6 @@ namespace NYql { DbResolverResponse_->DatabaseId2Endpoint.end()); DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>(); YQL_CLOG(DEBUG, ProviderGeneric) << "ResolvedIds: " << FullResolvedIds_.size(); - auto& endpoints = State_->Configuration->Endpoints; const auto& id2Clusters = State_->Configuration->DbId2Clusters; for (const auto& [dbIdWithType, info] : FullResolvedIds_) { const auto& dbId = dbIdWithType.first; @@ -97,18 +95,6 @@ namespace NYql { if (iter == id2Clusters.end()) { continue; } - for (const auto& clusterName : iter->second) { - YQL_CLOG(DEBUG, ProviderGeneric) << "Resolved endpoint: " << info.Endpoint << " for id: " << dbId; - auto& [endpoint, secure] = endpoints[clusterName]; - if (const auto it = endpoint.find(':'); it != TString::npos) { - secure = info.Secure; - endpoint = info.Endpoint; - if (info.Endpoint.find(':') == TString::npos) { - const auto port = endpoint.substr(it); - endpoint += port; - } - } - } } return TStatus::Ok; } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index b36e26afa5d..67735a11e1e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -103,26 +103,23 @@ namespace NYql { for (const auto& item : pendingTables) { Connector::API::DescribeTableRequest request; - const auto& cluster = item.first; - const auto it = State_->Configuration->Urls.find(cluster); - YQL_ENSURE(State_->Configuration->Urls.cend() != it, "Cluster not found:" << cluster); + const auto& clusterName = item.first; + const auto it = State_->Configuration->ClusterConfigs.find(clusterName); + YQL_ENSURE(State_->Configuration->ClusterConfigs.cend() != it, "Cluster not found:" << clusterName); + + const auto& clusterConfig = it->second; TString token; - if (const auto cred = State_->Types->Credentials->FindCredential("default_" + cluster)) { + if (const auto cred = State_->Types->Credentials->FindCredential("default_" + clusterName)) { token = cred->Content; } else { - token = State_->Configuration->Tokens[cluster]; + token = State_->Configuration->Tokens[clusterName]; } - const auto one = token.find('#'), two = token.rfind('#'); - YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token); - auto dsi = request.mutable_data_source_instance(); - dsi->set_endpoint(it->second.Endpoint()); - - auto auth = request.mutable_data_source_instance()->mutable_credentials()->mutable_basic(); - auth->set_username(token.substr(one + 1U, two - one - 1U)); - auth->set_password(token.substr(two + 1U)); + dsi->mutable_endpoint()->CopyFrom(clusterConfig.GetEndpoint()); + dsi->set_kind(clusterConfig.GetKind()); + dsi->mutable_credentials()->CopyFrom(clusterConfig.GetCredentials()); const auto& table = item.second; TStringBuf db, dbTable; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index 989e8b7e646..36a555f2524 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -4,6 +4,7 @@ #include <ydb/library/yql/providers/common/config/yql_setting.h> #include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/generic/connector/api/protos/connector.pb.h> #include <ydb/library/yql/utils/log/log.h> namespace NYql { @@ -32,75 +33,38 @@ namespace NYql { void Init(const TProtoConfig& config, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver, THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds) { - // TODO: support data sources other than ClickHouse here - TVector<TString> clusters(Reserve(config.GetClickHouse().ClusterMappingSize())); - for (auto& cluster : config.GetClickHouse().GetClusterMapping()) { - clusters.push_back(cluster.GetName()); + TVector<TString> clusterNames(Reserve(config.ClusterMappingSize())); + + for (auto& cluster : config.GetClusterMapping()) { + clusterNames.push_back(cluster.GetName()); + ClusterConfigs[cluster.GetName()] = cluster; } - this->SetValidClusters(clusters); + this->SetValidClusters(clusterNames); // TODO: support data sources other than ClickHouse here - this->Dispatch(config.GetClickHouse().GetDefaultSettings()); - for (auto& cluster : config.GetClickHouse().GetClusterMapping()) { - this->Dispatch(cluster.GetName(), cluster.GetSettings()); - + for (auto& cluster : config.GetClusterMapping()) { if (dbResolver) { + const auto& databaseID = cluster.GetDatabaseID(); YQL_CLOG(DEBUG, ProviderGeneric) - << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetId() - << ", cluster.GetCluster(): " << cluster.GetCluster() - << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE"); - databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Generic)] = - NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false}; - if (cluster.GetId()) { - DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName()); - YQL_CLOG(DEBUG, ProviderGeneric) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters"; - } - } - - Tokens[cluster.GetName()] = cluster.GetCHToken(); - // TODO: Drop later - TString endpoint; - if (cluster.HasCluster()) { - endpoint = cluster.GetCluster(); - if (endpoint.StartsWith("https://")) { - endpoint = endpoint.substr(8); - } - endpoint = endpoint.substr(0, endpoint.find(':')); - } else { - endpoint = cluster.GetId(); - } - Endpoints[cluster.GetName()] = - std::make_pair(endpoint + ":" + ToString(cluster.GetNativeHostPort()), cluster.GetNativeSecure()); - - auto& url = Urls[cluster.GetName()]; - auto& host = url.Host; - auto& scheme = url.Scheme; - auto& port = url.Port; - host = cluster.GetCluster(); - while (host.EndsWith("/")) - host = host.substr(0u, host.length() - 1u); - if (host.StartsWith("http://")) { - scheme = HS_HTTP; - host = host.substr(7u); - port = 80; - } else { - scheme = HS_HTTPS; - port = 443; - if (host.StartsWith("https://")) { - host = host.substr(8u); + << "Settings: clusterName = " << cluster.GetName() << ", cluster cloud id = " << databaseID + << ", cluster.GetEndpoint(): " << cluster.GetEndpoint() + << ", HasEndpoint: " << (cluster.HasEndpoint() ? "TRUE" : "FALSE"); + + // TODO: recover logic with structured tokens + databaseIds[std::make_pair(databaseID, NYql::DatabaseType::Generic)] = + NYql::TDatabaseAuth{"", /*AddBearer=*/false}; + if (databaseID) { + DbId2Clusters[databaseID].emplace_back(cluster.GetName()); + YQL_CLOG(DEBUG, ProviderGeneric) << "Add cluster cloud id: " << databaseID << " to DbId2Clusters"; } } - if (const auto p = host.rfind(':'); TString::npos != p) { - port = ::FromString<ui16>(host.substr(p + 1u)); - host = host.substr(0u, p); - } - - if (cluster.HasHostScheme()) - scheme = cluster.GetHostScheme(); - if (cluster.HasHostPort()) - port = cluster.GetHostPort(); + // NOTE: Tokens map is left because of legacy. + // There are no reasons for provider to store these tokens other than + // to keep compatibility with YQL engine. + const auto& basic = cluster.GetCredentials().basic(); + Tokens[cluster.GetName()] = TStringBuilder() << "basic#" << basic.username() << "#" << basic.password(); } this->FreezeDefaults(); } @@ -109,8 +73,7 @@ namespace NYql { TGenericSettings::TConstPtr Snapshot() const; THashMap<TString, TString> Tokens; - THashMap<TString, TGenericURL> Urls; - THashMap<TString, std::pair<TString, bool>> Endpoints; + THashMap<TString, TGenericClusterConfig> ClusterConfigs; THashMap<TString, TVector<TString>> DbId2Clusters; // DatabaseId -> ClusterNames }; |
