aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTimur Sufiyanov <fa-luke16@mail.ru>2024-07-08 14:13:58 +0300
committerGitHub <noreply@github.com>2024-07-08 14:13:58 +0300
commit7073beb9ee3c43199529046c0b561da135185675 (patch)
treee3dff47002a30bc4ea86d8df7eb8f6c69e76eea5
parentab119a2fa80082a678a1b9df3740894b2f4a49d5 (diff)
downloadydb-7073beb9ee3c43199529046c0b561da135185675.tar.gz
YDB FQ: Add MySQL to fq proxy (#6298)
-rw-r--r--ydb/core/fq/libs/actors/clusters_from_connections.cpp12
-rw-r--r--ydb/core/fq/libs/common/util.cpp11
-rw-r--r--ydb/core/fq/libs/compute/common/config.h1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp13
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/utils/utils.h3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.cpp15
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp2
-rw-r--r--ydb/public/api/protos/draft/fq.proto10
8 files changed, 67 insertions, 0 deletions
diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
index 36a5fe05f2..7b2d8187fd 100644
--- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp
+++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp
@@ -283,6 +283,18 @@ void AddClustersFromConnections(
clusters.emplace(connectionName, GenericProviderName);
break;
}
+ case FederatedQuery::ConnectionSetting::kMysqlCluster: {
+ FillGenericClusterConfig(
+ common,
+ *gatewaysConfig.MutableGeneric()->AddClusterMapping(),
+ conn.content().setting().mysql_cluster(),
+ connectionName,
+ NYql::NConnector::NApi::EDataSourceKind::MYSQL,
+ authToken,
+ accountIdSignatures);
+ clusters.emplace(connectionName, GenericProviderName);
+ break;
+ }
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
diff --git a/ydb/core/fq/libs/common/util.cpp b/ydb/core/fq/libs/common/util.cpp
index 433709568e..61d2ea43bb 100644
--- a/ydb/core/fq/libs/common/util.cpp
+++ b/ydb/core/fq/libs/common/util.cpp
@@ -129,6 +129,9 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
+ case FederatedQuery::ConnectionSetting::kMysqlCluster: {
+ return GetServiceAccountId(setting.mysql_cluster().auth());
+ }
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
@@ -162,6 +165,8 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
return setting.postgresql_cluster().login();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().login();
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
+ return setting.mysql_cluster().login();
}
}
@@ -183,6 +188,8 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
return setting.postgresql_cluster().password();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().password();
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
+ return setting.mysql_cluster().password();
}
}
@@ -204,6 +211,8 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
+ return GetBasicAuthMethod(setting.mysql_cluster().auth());
}
}
@@ -223,6 +232,8 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
return connection.content().setting().postgresql_cluster().auth();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return connection.content().setting().greenplum_cluster().auth();
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
+ return connection.content().setting().mysql_cluster().auth();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth{};
}
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
index 5817e2d94d..dec5b1a84f 100644
--- a/ydb/core/fq/libs/compute/common/config.h
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -165,6 +165,7 @@ public:
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
index 728bb20811..1a781f3742 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -248,6 +248,19 @@ TString MakeCreateExternalDataSourceQuery(
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});
}
+ case FederatedQuery::ConnectionSetting::kMysqlCluster: {
+ properties = fmt::format(
+ R"(
+ SOURCE_TYPE="MySQL",
+ MDB_CLUSTER_ID={mdb_cluster_id},
+ DATABASE_NAME={database_name},
+ USE_TLS="{use_tls}"
+ )",
+ "mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
+ "database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
+ "use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");
+
+ }
break;
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/utils/utils.h b/ydb/core/fq/libs/control_plane_proxy/utils/utils.h
index 1e6a531e2b..2eb9702387 100644
--- a/ydb/core/fq/libs/control_plane_proxy/utils/utils.h
+++ b/ydb/core/fq/libs/control_plane_proxy/utils/utils.h
@@ -34,6 +34,9 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
return GetServiceAccountId(setting.greenplum_cluster().auth());
}
+ case FederatedQuery::ConnectionSetting::kMysqlCluster: {
+ return GetServiceAccountId(setting.mysql_cluster().auth());
+ }
// Do not replace with default. Adding a new connection should cause a compilation error
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
break;
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
index 03cf908e52..6ed9dbb9db 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
@@ -84,6 +84,21 @@ NYql::TIssues ValidateConnectionSetting(
}
break;
}
+ case FederatedQuery::ConnectionSetting::kMysqlCluster: {
+ const FederatedQuery::MySQLCluster database = setting.mysql_cluster();
+ if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.auth field is not specified"));
+ }
+
+ if (database.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
+ }
+
+ if (!database.database_id() && !database.database_name()) {
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.{database_id or database_name} field is not specified"));
+ }
+ break;
+ }
case FederatedQuery::ConnectionSetting::kObjectStorage: {
const FederatedQuery::ObjectStorageConnection objectStorage = setting.object_storage();
if (!objectStorage.has_auth() || objectStorage.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 62a2ad749a..ae371c52b8 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -41,6 +41,8 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio
return setting.postgresql_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
return setting.greenplum_cluster().auth().identity_case();
+ case FederatedQuery::ConnectionSetting::kMysqlCluster:
+ return setting.mysql_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return FederatedQuery::IamAuth::IDENTITY_NOT_SET;
}
diff --git a/ydb/public/api/protos/draft/fq.proto b/ydb/public/api/protos/draft/fq.proto
index 709e0a963a..d1713334a0 100644
--- a/ydb/public/api/protos/draft/fq.proto
+++ b/ydb/public/api/protos/draft/fq.proto
@@ -501,6 +501,14 @@ message GreenplumCluster {
IamAuth auth = 6;
}
+message MySQLCluster {
+ string database_id = 1 [(Ydb.length).le = 1024];
+ string database_name = 2 [(Ydb.length).le = 1024];
+ string login = 3 [(Ydb.length).le = 1024, (Ydb.sensitive) = true];
+ string password = 4 [(Ydb.length).le = 1024, (Ydb.sensitive) = true];
+ IamAuth auth = 5;
+}
+
message ConnectionSetting {
enum ConnectionType {
CONNECTION_TYPE_UNSPECIFIED = 0;
@@ -511,6 +519,7 @@ message ConnectionSetting {
MONITORING = 5;
POSTGRESQL_CLUSTER = 6;
GREENPLUM_CLUSTER = 7;
+ MYSQL_CLUSTER = 8;
}
oneof connection {
@@ -521,6 +530,7 @@ message ConnectionSetting {
Monitoring monitoring = 5;
PostgreSQLCluster postgresql_cluster = 6;
GreenplumCluster greenplum_cluster = 7;
+ MySQLCluster mysql_cluster = 8;
}
}