summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-06-14 18:14:01 +0300
committervitalyisaev <[email protected]>2023-06-14 18:14:01 +0300
commit607f1bd3dab2c692f1b508929fde3bb5a89e3085 (patch)
tree434da98b7633c746d71666c182f611851552f1d8
parent5f83c9f31701030a967c2a618034c562d73261b0 (diff)
Support PostgreSQL in Connector
1. Добавлена поддержка PostgreSQL. 2. На разных уровнях появились абстракции, обобщающие работу с реляционными источниками данных: - в `gateways.conf `- обобщённое описание кластера к любому источнику; - в коде Go Connector - пакет `rdbms` c общей логикой работы с РСУБД. 3. Из провайдера удалена ещё одна порция Clickhouse-cпецифичной логики.
-rw-r--r--ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto25
-rw-r--r--ydb/library/yql/providers/common/proto/ya.make1
-rw-r--r--ydb/library/yql/providers/generic/connector/api/protos/connector.proto71
-rw-r--r--ydb/library/yql/providers/generic/connector/api/protos/ya.make3
-rw-r--r--ydb/library/yql/providers/generic/connector/api/ya.make3
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp3
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp5
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp22
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp23
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_settings.h87
14 files changed, 102 insertions, 145 deletions
diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt
index bf15f57e3fa..0841a72daef 100644
--- a/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/common/proto/CMakeLists.darwin-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(providers-common-proto PUBLIC
contrib-libs-cxxsupp
yutil
library-yql-protos
+ connector-api-protos
contrib-libs-protobuf
)
target_proto_messages(providers-common-proto PRIVATE
diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt
index f4fc466d366..8dade7c78c9 100644
--- a/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/common/proto/CMakeLists.linux-aarch64.txt
@@ -37,6 +37,7 @@ target_link_libraries(providers-common-proto PUBLIC
contrib-libs-cxxsupp
yutil
library-yql-protos
+ connector-api-protos
contrib-libs-protobuf
)
target_proto_messages(providers-common-proto PRIVATE
diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt
index f4fc466d366..8dade7c78c9 100644
--- a/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/common/proto/CMakeLists.linux-x86_64.txt
@@ -37,6 +37,7 @@ target_link_libraries(providers-common-proto PUBLIC
contrib-libs-cxxsupp
yutil
library-yql-protos
+ connector-api-protos
contrib-libs-protobuf
)
target_proto_messages(providers-common-proto PRIVATE
diff --git a/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt
index bf15f57e3fa..0841a72daef 100644
--- a/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/common/proto/CMakeLists.windows-x86_64.txt
@@ -36,6 +36,7 @@ target_link_libraries(providers-common-proto PUBLIC
contrib-libs-cxxsupp
yutil
library-yql-protos
+ connector-api-protos
contrib-libs-protobuf
)
target_proto_messages(providers-common-proto PRIVATE
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 87d25c05217..4b223fb456b 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -2,6 +2,7 @@ package NYql;
option java_package = "ru.yandex.yql.proto";
import "ydb/library/yql/protos/clickhouse.proto";
+import "ydb/library/yql/providers/generic/connector/api/protos/connector.proto";
/////////////////////////////// common ///////////////////////////////
@@ -532,11 +533,29 @@ message TDbToolConfig {
/////////// Generic gateway for the external data sources ////////////
+message TGenericClusterConfig {
+ // Cluster name
+ required string Name = 1;
+
+ // Data source kind
+ required NYql.Connector.API.DataSourceKind Kind = 2;
+
+ // Location represents the network address of a data source instance we want to connect
+ oneof Location {
+ // Endpoint must be used for on-premise deployments.
+ NYql.Connector.API.Endpoint Endpoint = 3;
+ // DatabaseID must be used when the data source is deployed in cloud.
+ // Data source FQDN and port will be resolved by MDB service.
+ string DatabaseID = 4;
+ }
+
+ required NYql.Connector.API.Credentials Credentials = 5;
+}
+
message TGenericGatewayConfig {
required string Endpoint = 1;
-
- // external data sources that can be accessed via connector
- optional TClickHouseGatewayConfig ClickHouse = 2;
+ reserved 2;
+ repeated TGenericClusterConfig ClusterMapping = 3;
}
/////////////////////////////// Root ///////////////////////////////
diff --git a/ydb/library/yql/providers/common/proto/ya.make b/ydb/library/yql/providers/common/proto/ya.make
index b4480f31254..3c3ffac95d8 100644
--- a/ydb/library/yql/providers/common/proto/ya.make
+++ b/ydb/library/yql/providers/common/proto/ya.make
@@ -7,6 +7,7 @@ SRCS(
PEERDIR(
ydb/library/yql/protos
+ ydb/library/yql/providers/generic/connector/api/protos
)
IF (NOT PY_PROTOS_FOR)
diff --git a/ydb/library/yql/providers/generic/connector/api/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/protos/connector.proto
index c09bf302c1b..760569b4d94 100644
--- a/ydb/library/yql/providers/generic/connector/api/protos/connector.proto
+++ b/ydb/library/yql/providers/generic/connector/api/protos/connector.proto
@@ -398,55 +398,40 @@ message Filter {
// ---------- Utils ----------
-// DataSourceInstance helps to identify the instance of a data source to route request to.
-message DataSourceInstance {
- oneof location {
- // Network address to connect to '<prefix>://<path>[:<port>]', for example:
- // http://my_external_service:2023
- // grpc://my_external_service:2023
- string endpoint = 1;
-
- // Preconfigured installation name hiding network details.
- // This installation must be described in the configuration of Connector service app:
- // installations {
- // { "postgres", {"https://postgres.yandexcloud.net", other_params}
- // }
- string installation = 2;
- }
-
- // What database to connect
- string database = 3;
+// Describes the kind of external data source
+enum DataSourceKind {
+ DATA_SOURCE_KIND_RESERVED = 0;
+ CLICKHOUSE = 1;
+ POSTGRESQL = 2;
+}
- message Credentials {
- // Basic
- message Basic {
- string username = 1;
- string password = 2;
- }
+message Endpoint {
+ string host = 1;
+ uint32 port = 2;
+}
- // Copied from: https://a.yandex-team.ru/arcadia/ydb/public/api/protos/draft/fq.proto?rev=r10844260#L419
- message NoneAuth {}
+message Credentials {
+ // Basic
+ message Basic {
+ string username = 1;
+ string password = 2;
+ }
- // Copied from: https://a.yandex-team.ru/arcadia/ydb/public/api/protos/draft/fq.proto?rev=r10844260#L426
- message IamAuth {
- message CurrentIAMTokenAuth {}
+ oneof payload {
+ Basic basic = 1;
+ }
+}
- message ServiceAccountAuth {
- string id = 1;
- }
+// DataSourceInstance helps to identify the instance of a data source to route request to.
+message DataSourceInstance {
+ // Kind of the DataSource
+ DataSourceKind kind = 1;
- oneof identity {
- CurrentIAMTokenAuth current_iam = 1;
- ServiceAccountAuth service_account = 2;
- }
- }
+ // Network address to connect
+ Endpoint endpoint = 2;
- oneof payload {
- Basic basic = 1;
- IamAuth iam_auth = 2;
- NoneAuth none = 3;
- }
- }
+ // What database to connect
+ string database = 3;
// Information for authentication
Credentials credentials = 4;
diff --git a/ydb/library/yql/providers/generic/connector/api/protos/ya.make b/ydb/library/yql/providers/generic/connector/api/protos/ya.make
index 2c92b6c4813..b6f247a92ec 100644
--- a/ydb/library/yql/providers/generic/connector/api/protos/ya.make
+++ b/ydb/library/yql/providers/generic/connector/api/protos/ya.make
@@ -4,7 +4,8 @@ PEERDIR(
ydb/public/api/protos
)
-ONLY_TAGS(CPP_PROTO PY3_PROTO)
+# Because Go is excluded in YDB protofiles
+EXCLUDE_TAGS(GO_PROTO)
SRCS(
connector.proto
diff --git a/ydb/library/yql/providers/generic/connector/api/ya.make b/ydb/library/yql/providers/generic/connector/api/ya.make
index 721fae90b0e..b255d99f9ad 100644
--- a/ydb/library/yql/providers/generic/connector/api/ya.make
+++ b/ydb/library/yql/providers/generic/connector/api/ya.make
@@ -6,7 +6,8 @@ SRCS(
connector.proto
)
-ONLY_TAGS(CPP_PROTO PY3_PROTO)
+# Because Go is excluded in YDB protofiles
+EXCLUDE_TAGS(GO_PROTO)
PEERDIR(
ydb/library/yql/providers/generic/connector/api/protos
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
index 04452483b02..7942485feb9 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/cli/main.cpp
@@ -6,7 +6,8 @@ const TString TableName = "example_3";
void SetDatabaseSourceInstance(NYql::Connector::API::DataSourceInstance* dsi) {
dsi->set_database("dqrun");
- dsi->set_endpoint("localhost:9000");
+ dsi->mutable_endpoint()->set_host("localhost");
+ dsi->mutable_endpoint()->set_port(9000);
dsi->mutable_credentials()->mutable_basic()->set_username("crab");
dsi->mutable_credentials()->mutable_basic()->set_password("qwerty12345");
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
index 248d03bfde1..d5236eca168 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp
@@ -96,12 +96,11 @@ namespace NYql {
const auto& cluster = source.DataSource().Cast<TGenDataSource>().Cluster().StringValue();
const auto& table = settings.Table().StringValue();
const auto& token = settings.Token().Name().StringValue();
- const auto& connect = State_->Configuration->Urls[cluster];
- const auto endpoint = connect.Endpoint();
+ const auto& endpoint = State_->Configuration->ClusterConfigs[cluster].endpoint();
YQL_CLOG(INFO, ProviderGeneric)
<< "Filling source settings"
- << ": cluster: " << cluster << ", table: " << table << ", endpoint: " << endpoint;
+ << ": cluster: " << cluster << ", table: " << table << ", endpoint: " << endpoint.DebugString();
Generic::TSource srcDesc;
srcDesc.set_token(token);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
index cca47d8c2e8..81c2aef829b 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp
@@ -49,13 +49,12 @@ namespace NYql {
const TGenRead read(node);
const auto cluster = read.DataSource().Cluster().StringValue();
YQL_CLOG(DEBUG, ProviderGeneric) << "Found cluster: " << cluster;
- auto dbId = State_->Configuration->Endpoints[cluster].first;
- dbId = dbId.substr(0, dbId.find(':'));
- YQL_CLOG(DEBUG, ProviderGeneric) << "Found dbId: " << dbId;
- const auto idKey = std::make_pair(dbId, NYql::DatabaseType::Generic);
+ auto databaseID = State_->Configuration->ClusterConfigs[cluster].GetDatabaseID();
+ YQL_CLOG(DEBUG, ProviderGeneric) << "Found cloudID: " << databaseID;
+ const auto idKey = std::make_pair(databaseID, NYql::DatabaseType::Generic);
const auto iter = State_->DatabaseIds.find(idKey);
if (iter != State_->DatabaseIds.end()) {
- YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CH id: " << dbId;
+ YQL_CLOG(DEBUG, ProviderGeneric) << "Resolve CloudID: " << databaseID;
ids[idKey] = iter->second;
}
}
@@ -89,7 +88,6 @@ namespace NYql {
DbResolverResponse_->DatabaseId2Endpoint.end());
DbResolverResponse_ = std::make_shared<NYql::TDbResolverResponse>();
YQL_CLOG(DEBUG, ProviderGeneric) << "ResolvedIds: " << FullResolvedIds_.size();
- auto& endpoints = State_->Configuration->Endpoints;
const auto& id2Clusters = State_->Configuration->DbId2Clusters;
for (const auto& [dbIdWithType, info] : FullResolvedIds_) {
const auto& dbId = dbIdWithType.first;
@@ -97,18 +95,6 @@ namespace NYql {
if (iter == id2Clusters.end()) {
continue;
}
- for (const auto& clusterName : iter->second) {
- YQL_CLOG(DEBUG, ProviderGeneric) << "Resolved endpoint: " << info.Endpoint << " for id: " << dbId;
- auto& [endpoint, secure] = endpoints[clusterName];
- if (const auto it = endpoint.find(':'); it != TString::npos) {
- secure = info.Secure;
- endpoint = info.Endpoint;
- if (info.Endpoint.find(':') == TString::npos) {
- const auto port = endpoint.substr(it);
- endpoint += port;
- }
- }
- }
}
return TStatus::Ok;
}
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index b36e26afa5d..67735a11e1e 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -103,26 +103,23 @@ namespace NYql {
for (const auto& item : pendingTables) {
Connector::API::DescribeTableRequest request;
- const auto& cluster = item.first;
- const auto it = State_->Configuration->Urls.find(cluster);
- YQL_ENSURE(State_->Configuration->Urls.cend() != it, "Cluster not found:" << cluster);
+ const auto& clusterName = item.first;
+ const auto it = State_->Configuration->ClusterConfigs.find(clusterName);
+ YQL_ENSURE(State_->Configuration->ClusterConfigs.cend() != it, "Cluster not found:" << clusterName);
+
+ const auto& clusterConfig = it->second;
TString token;
- if (const auto cred = State_->Types->Credentials->FindCredential("default_" + cluster)) {
+ if (const auto cred = State_->Types->Credentials->FindCredential("default_" + clusterName)) {
token = cred->Content;
} else {
- token = State_->Configuration->Tokens[cluster];
+ token = State_->Configuration->Tokens[clusterName];
}
- const auto one = token.find('#'), two = token.rfind('#');
- YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token);
-
auto dsi = request.mutable_data_source_instance();
- dsi->set_endpoint(it->second.Endpoint());
-
- auto auth = request.mutable_data_source_instance()->mutable_credentials()->mutable_basic();
- auth->set_username(token.substr(one + 1U, two - one - 1U));
- auth->set_password(token.substr(two + 1U));
+ dsi->mutable_endpoint()->CopyFrom(clusterConfig.GetEndpoint());
+ dsi->set_kind(clusterConfig.GetKind());
+ dsi->mutable_credentials()->CopyFrom(clusterConfig.GetCredentials());
const auto& table = item.second;
TStringBuf db, dbTable;
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
index 989e8b7e646..36a555f2524 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/common/config/yql_setting.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/generic/connector/api/protos/connector.pb.h>
#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
@@ -32,75 +33,38 @@ namespace NYql {
void Init(const TProtoConfig& config, const std::shared_ptr<NYql::IDatabaseAsyncResolver> dbResolver,
THashMap<std::pair<TString, NYql::DatabaseType>, NYql::TDatabaseAuth>& databaseIds)
{
- // TODO: support data sources other than ClickHouse here
- TVector<TString> clusters(Reserve(config.GetClickHouse().ClusterMappingSize()));
- for (auto& cluster : config.GetClickHouse().GetClusterMapping()) {
- clusters.push_back(cluster.GetName());
+ TVector<TString> clusterNames(Reserve(config.ClusterMappingSize()));
+
+ for (auto& cluster : config.GetClusterMapping()) {
+ clusterNames.push_back(cluster.GetName());
+ ClusterConfigs[cluster.GetName()] = cluster;
}
- this->SetValidClusters(clusters);
+ this->SetValidClusters(clusterNames);
// TODO: support data sources other than ClickHouse here
- this->Dispatch(config.GetClickHouse().GetDefaultSettings());
- for (auto& cluster : config.GetClickHouse().GetClusterMapping()) {
- this->Dispatch(cluster.GetName(), cluster.GetSettings());
-
+ for (auto& cluster : config.GetClusterMapping()) {
if (dbResolver) {
+ const auto& databaseID = cluster.GetDatabaseID();
YQL_CLOG(DEBUG, ProviderGeneric)
- << "Settings: clusterName = " << cluster.GetName() << ", clusterDbId = " << cluster.GetId()
- << ", cluster.GetCluster(): " << cluster.GetCluster()
- << ", HasCluster: " << (cluster.HasCluster() ? "TRUE" : "FALSE");
- databaseIds[std::make_pair(cluster.GetId(), NYql::DatabaseType::Generic)] =
- NYql::TDatabaseAuth{cluster.GetCHToken(), /*AddBearer=*/false};
- if (cluster.GetId()) {
- DbId2Clusters[cluster.GetId()].emplace_back(cluster.GetName());
- YQL_CLOG(DEBUG, ProviderGeneric) << "Add dbId: " << cluster.GetId() << " to DbId2Clusters";
- }
- }
-
- Tokens[cluster.GetName()] = cluster.GetCHToken();
- // TODO: Drop later
- TString endpoint;
- if (cluster.HasCluster()) {
- endpoint = cluster.GetCluster();
- if (endpoint.StartsWith("https://")) {
- endpoint = endpoint.substr(8);
- }
- endpoint = endpoint.substr(0, endpoint.find(':'));
- } else {
- endpoint = cluster.GetId();
- }
- Endpoints[cluster.GetName()] =
- std::make_pair(endpoint + ":" + ToString(cluster.GetNativeHostPort()), cluster.GetNativeSecure());
-
- auto& url = Urls[cluster.GetName()];
- auto& host = url.Host;
- auto& scheme = url.Scheme;
- auto& port = url.Port;
- host = cluster.GetCluster();
- while (host.EndsWith("/"))
- host = host.substr(0u, host.length() - 1u);
- if (host.StartsWith("http://")) {
- scheme = HS_HTTP;
- host = host.substr(7u);
- port = 80;
- } else {
- scheme = HS_HTTPS;
- port = 443;
- if (host.StartsWith("https://")) {
- host = host.substr(8u);
+ << "Settings: clusterName = " << cluster.GetName() << ", cluster cloud id = " << databaseID
+ << ", cluster.GetEndpoint(): " << cluster.GetEndpoint()
+ << ", HasEndpoint: " << (cluster.HasEndpoint() ? "TRUE" : "FALSE");
+
+ // TODO: recover logic with structured tokens
+ databaseIds[std::make_pair(databaseID, NYql::DatabaseType::Generic)] =
+ NYql::TDatabaseAuth{"", /*AddBearer=*/false};
+ if (databaseID) {
+ DbId2Clusters[databaseID].emplace_back(cluster.GetName());
+ YQL_CLOG(DEBUG, ProviderGeneric) << "Add cluster cloud id: " << databaseID << " to DbId2Clusters";
}
}
- if (const auto p = host.rfind(':'); TString::npos != p) {
- port = ::FromString<ui16>(host.substr(p + 1u));
- host = host.substr(0u, p);
- }
-
- if (cluster.HasHostScheme())
- scheme = cluster.GetHostScheme();
- if (cluster.HasHostPort())
- port = cluster.GetHostPort();
+ // NOTE: Tokens map is left because of legacy.
+ // There are no reasons for provider to store these tokens other than
+ // to keep compatibility with YQL engine.
+ const auto& basic = cluster.GetCredentials().basic();
+ Tokens[cluster.GetName()] = TStringBuilder() << "basic#" << basic.username() << "#" << basic.password();
}
this->FreezeDefaults();
}
@@ -109,8 +73,7 @@ namespace NYql {
TGenericSettings::TConstPtr Snapshot() const;
THashMap<TString, TString> Tokens;
- THashMap<TString, TGenericURL> Urls;
- THashMap<TString, std::pair<TString, bool>> Endpoints;
+ THashMap<TString, TGenericClusterConfig> ClusterConfigs;
THashMap<TString, TVector<TString>> DbId2Clusters; // DatabaseId -> ClusterNames
};