diff options
author | hcpp <[email protected]> | 2023-09-01 15:38:30 +0300 |
---|---|---|
committer | hcpp <[email protected]> | 2023-09-01 16:43:47 +0300 |
commit | 11c90944b3271b1433532bbb339753d62b15d324 (patch) | |
tree | 4410962e52ef1ef8ab2baad21189f5f3bf8003f3 | |
parent | a48fac51f94eeab5d65d801d2b9eed3788746585 (diff) |
pg and ch fq proxy
-rw-r--r-- | ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/util.cpp | 147 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/util.h | 23 | ||||
-rw-r--r-- | ydb/core/fq/libs/common/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/common/config.h | 4 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp | 23 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp | 183 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h | 6 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp | 26 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp | 33 |
13 files changed, 354 insertions, 141 deletions
diff --git a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt index af28035aad6..3f433561b01 100644 --- a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt @@ -7,6 +7,12 @@ add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(fq-libs-common) target_compile_options(fq-libs-common PRIVATE @@ -15,6 +21,7 @@ target_compile_options(fq-libs-common PRIVATE target_link_libraries(fq-libs-common PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime library-cpp-blockcodecs libs-control_plane_storage-events fq-libs-events @@ -29,3 +36,8 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) +generate_enum_serilization(fq-libs-common + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h + INCLUDE_HEADERS + ydb/core/fq/libs/common/util.h +) diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt index eba7bc83ba7..008f49085bc 100644 --- a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt @@ -7,6 +7,12 @@ add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(fq-libs-common) target_compile_options(fq-libs-common PRIVATE @@ -16,6 +22,7 @@ target_link_libraries(fq-libs-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime library-cpp-blockcodecs libs-control_plane_storage-events fq-libs-events @@ -30,3 +37,8 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) +generate_enum_serilization(fq-libs-common + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h + INCLUDE_HEADERS + ydb/core/fq/libs/common/util.h +) diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt index eba7bc83ba7..008f49085bc 100644 --- a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt @@ -7,6 +7,12 @@ add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(fq-libs-common) target_compile_options(fq-libs-common PRIVATE @@ -16,6 +22,7 @@ target_link_libraries(fq-libs-common PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime library-cpp-blockcodecs libs-control_plane_storage-events fq-libs-events @@ -30,3 +37,8 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) +generate_enum_serilization(fq-libs-common + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h + INCLUDE_HEADERS + ydb/core/fq/libs/common/util.h +) diff --git a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt index af28035aad6..3f433561b01 100644 --- a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt @@ -7,6 +7,12 @@ add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(fq-libs-common) target_compile_options(fq-libs-common PRIVATE @@ -15,6 +21,7 @@ target_compile_options(fq-libs-common PRIVATE target_link_libraries(fq-libs-common PUBLIC contrib-libs-cxxsupp yutil + tools-enum_parser-enum_serialization_runtime library-cpp-blockcodecs libs-control_plane_storage-events fq-libs-events @@ -29,3 +36,8 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) +generate_enum_serilization(fq-libs-common + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h + INCLUDE_HEADERS + ydb/core/fq/libs/common/util.h +) diff --git a/ydb/core/fq/libs/common/util.cpp b/ydb/core/fq/libs/common/util.cpp index 053de088890..32462a4af95 100644 --- a/ydb/core/fq/libs/common/util.cpp +++ b/ydb/core/fq/libs/common/util.cpp @@ -6,6 +6,42 @@ namespace NFq { +namespace { + +TString GetServiceAccountId(const FederatedQuery::IamAuth& auth) { + return auth.has_service_account() + ? auth.service_account().id() + : TString{}; +} + +EYdbComputeAuth GetIamAuthMethod(const FederatedQuery::IamAuth& auth) { + switch (auth.identity_case()) { + case FederatedQuery::IamAuth::kNone: + return EYdbComputeAuth::NONE; + case FederatedQuery::IamAuth::kServiceAccount: + return EYdbComputeAuth::SERVICE_ACCOUNT; + case FederatedQuery::IamAuth::kCurrentIam: + // Do not replace with default. Adding a new auth item should cause a compilation error + case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + return EYdbComputeAuth::UNKNOWN; + } +} + +EYdbComputeAuth GetBasicAuthMethod(const FederatedQuery::IamAuth& auth) { + switch (auth.identity_case()) { + case FederatedQuery::IamAuth::kNone: + return EYdbComputeAuth::BASIC; + case FederatedQuery::IamAuth::kServiceAccount: + return EYdbComputeAuth::MDB_BASIC; + case FederatedQuery::IamAuth::kCurrentIam: + // Do not replace with default. Adding a new auth item should cause a compilation error + case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + return EYdbComputeAuth::UNKNOWN; + } +} + +} + TString EscapeString(const TString& value, const TString& enclosingSeq, const TString& replaceWith) { @@ -35,4 +71,115 @@ TString EncloseAndEscapeString(const TString& value, << enclosingSeq; } +TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting) { + switch (setting.connection_case()) { + case FederatedQuery::ConnectionSetting::kYdbDatabase: { + return GetServiceAccountId(setting.ydb_database().auth()); + } + case FederatedQuery::ConnectionSetting::kDataStreams: { + return GetServiceAccountId(setting.data_streams().auth()); + } + case FederatedQuery::ConnectionSetting::kObjectStorage: { + return GetServiceAccountId(setting.object_storage().auth()); + } + case FederatedQuery::ConnectionSetting::kMonitoring: { + return GetServiceAccountId(setting.monitoring().auth()); + } + case FederatedQuery::ConnectionSetting::kClickhouseCluster: { + return GetServiceAccountId(setting.clickhouse_cluster().auth()); + } + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: { + return GetServiceAccountId(setting.postgresql_cluster().auth()); + } + // Do not replace with default. Adding a new connection should cause a compilation error + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + break; + } + return {}; +} + +TString ExtractServiceAccountId(const FederatedQuery::ConnectionContent& content) { + return ExtractServiceAccountId(content.setting()); +} + +TString ExtractServiceAccountId(const FederatedQuery::Connection& connection) { + return ExtractServiceAccountId(connection.content()); +} + +TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) { + switch (setting.connection_case()) { + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + return {}; + case FederatedQuery::ConnectionSetting::kYdbDatabase: + return {}; + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + return setting.clickhouse_cluster().login(); + case FederatedQuery::ConnectionSetting::kDataStreams: + return {}; + case FederatedQuery::ConnectionSetting::kObjectStorage: + return {}; + case FederatedQuery::ConnectionSetting::kMonitoring: + return {}; + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + return setting.postgresql_cluster().login(); + } +} + +TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) { + switch (setting.connection_case()) { + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + return {}; + case FederatedQuery::ConnectionSetting::kYdbDatabase: + return {}; + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + return setting.clickhouse_cluster().password(); + case FederatedQuery::ConnectionSetting::kDataStreams: + return {}; + case FederatedQuery::ConnectionSetting::kObjectStorage: + return {}; + case FederatedQuery::ConnectionSetting::kMonitoring: + return {}; + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + return setting.postgresql_cluster().password(); + } +} + +EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting& setting) { + switch (setting.connection_case()) { + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + return EYdbComputeAuth::UNKNOWN; + case FederatedQuery::ConnectionSetting::kYdbDatabase: + return GetIamAuthMethod(setting.ydb_database().auth()); + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + return GetBasicAuthMethod(setting.clickhouse_cluster().auth()); + case FederatedQuery::ConnectionSetting::kDataStreams: + return GetIamAuthMethod(setting.data_streams().auth()); + case FederatedQuery::ConnectionSetting::kObjectStorage: + return GetIamAuthMethod(setting.object_storage().auth()); + case FederatedQuery::ConnectionSetting::kMonitoring: + return GetIamAuthMethod(setting.monitoring().auth()); + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + return GetBasicAuthMethod(setting.postgresql_cluster().auth()); + } +} + +FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) { + switch (connection.content().setting().connection_case()) { + case FederatedQuery::ConnectionSetting::kObjectStorage: + return connection.content().setting().object_storage().auth(); + case FederatedQuery::ConnectionSetting::kYdbDatabase: + return connection.content().setting().ydb_database().auth(); + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + return connection.content().setting().clickhouse_cluster().auth(); + case FederatedQuery::ConnectionSetting::kDataStreams: + return connection.content().setting().data_streams().auth(); + case FederatedQuery::ConnectionSetting::kMonitoring: + return connection.content().setting().monitoring().auth(); + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + return connection.content().setting().postgresql_cluster().auth(); + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + return FederatedQuery::IamAuth{}; + } +} + } // namespace NFq diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h index b7aedf6e6c4..41a78006887 100644 --- a/ydb/core/fq/libs/common/util.h +++ b/ydb/core/fq/libs/common/util.h @@ -4,6 +4,7 @@ #include <array> #include <google/protobuf/repeated_field.h> +#include <ydb/public/api/protos/draft/fq.pb.h> #include <library/cpp/iterator/mapped.h> #include <util/generic/string.h> @@ -12,6 +13,14 @@ namespace NFq { +enum class EYdbComputeAuth { + UNKNOWN, + NONE, + SERVICE_ACCOUNT, + BASIC, + MDB_BASIC +}; + template<std::size_t K, typename T, std::size_t N> auto CreateArray(const T(&list)[N]) -> std::array<T, K> { static_assert(N == K, "not valid array size"); @@ -46,4 +55,18 @@ TString EncloseAndEscapeString(const TString& value, const TString& enclosingSeq, const TString& replaceWith); +TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting); + +TString ExtractServiceAccountId(const FederatedQuery::ConnectionContent& content); + +TString ExtractServiceAccountId(const FederatedQuery::Connection& connection); + +TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting); + +TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting); + +EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting& setting); + +FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection); + } // namespace NFq diff --git a/ydb/core/fq/libs/common/ya.make b/ydb/core/fq/libs/common/ya.make index 3246b6455ec..77aef7dd380 100644 --- a/ydb/core/fq/libs/common/ya.make +++ b/ydb/core/fq/libs/common/ya.make @@ -1,5 +1,7 @@ LIBRARY() +GENERATE_ENUM_SERIALIZATION(util.h) + SRCS( cache.h compression.cpp diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index 974d53e15d4..8a43d6a8208 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -144,12 +144,12 @@ public: const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const { switch (connectionCase) { case FederatedQuery::ConnectionSetting::kObjectStorage: + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: return true; case FederatedQuery::ConnectionSetting::kYdbDatabase: - case FederatedQuery::ConnectionSetting::kClickhouseCluster: case FederatedQuery::ConnectionSetting::kDataStreams: case FederatedQuery::ConnectionSetting::kMonitoring: - case FederatedQuery::ConnectionSetting::kPostgresqlCluster: case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: return false; } diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp index 5524f1921ce..dbfb7761486 100644 --- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp @@ -1,6 +1,6 @@ #include "synchronization_service.h" - +#include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/compute/common/config.h> #include <ydb/core/fq/libs/compute/common/utils.h> #include <ydb/core/fq/libs/compute/ydb/events/events.h> @@ -22,7 +22,6 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> - #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream) #define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream) #define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream) @@ -288,32 +287,12 @@ private: return {}; } - FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) { - switch (connection.content().setting().connection_case()) { - case FederatedQuery::ConnectionSetting::kObjectStorage: - return connection.content().setting().object_storage().auth(); - case FederatedQuery::ConnectionSetting::kYdbDatabase: - return connection.content().setting().ydb_database().auth(); - case FederatedQuery::ConnectionSetting::kClickhouseCluster: - return connection.content().setting().clickhouse_cluster().auth(); - case FederatedQuery::ConnectionSetting::kDataStreams: - return connection.content().setting().data_streams().auth(); - case FederatedQuery::ConnectionSetting::kMonitoring: - return connection.content().setting().monitoring().auth(); - case FederatedQuery::ConnectionSetting::kPostgresqlCluster: - return connection.content().setting().postgresql_cluster().auth(); - case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: - return FederatedQuery::IamAuth{}; - } - } - void ExcludeUnsupportedExternalDataSources() { TVector<TString> excludeIds; for (const auto& [_, connection]: Connections) { const auto& meta = connection.meta(); const auto& content = connection.content(); const auto& setting = content.setting(); - if (!ComputeConfig.IsConnectionCaseEnabled(setting.connection_case())) { LOG_I("Exclude connection by type: scope = " << Scope << " , id = " << meta.id() << ", type = " << static_cast<int>(setting.connection_case())); excludeIds.push_back(meta.id()); diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index c28ea7732a7..6dab875b23a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -92,52 +92,78 @@ TString SignAccountId(const TString& id, const TSigner::TPtr& signer) { return signer ? signer->SignAccountId(id) : TString{}; } -TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, - const TString& name, - const TSigner::TPtr& signer) { +TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::ConnectionSetting& setting, + const TString& name, + const TSigner::TPtr& signer) { using namespace fmt::literals; - switch (auth.identity_case()) { - case FederatedQuery::IamAuth::kServiceAccount: { - if (!signer) { - return {}; - } - return fmt::format(R"( - UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature}; - )", - "external_source"_a = EncloseAndEscapeString(name, '`'), - "signature"_a = EncloseAndEscapeString( - SignAccountId(auth.service_account().id(), signer), '`')); - } - case FederatedQuery::IamAuth::kNone: - case FederatedQuery::IamAuth::kCurrentIam: - // Do not replace with default. Adding a new auth item should cause a compilation error - case FederatedQuery::IamAuth::IDENTITY_NOT_SET: - return {}; + TString secretObjects; + auto serviceAccountId = ExtractServiceAccountId(setting); + if (serviceAccountId) { + secretObjects = signer ? fmt::format( + R"( + UPSERT OBJECT {sa_secret_name} (TYPE SECRET) WITH value={signature}; + )", + "sa_secret_name"_a = EncloseAndEscapeString("k1" + name, '`'), + "signature"_a = EncloseAndEscapeString(SignAccountId(serviceAccountId, signer), '`')) : std::string{}; } + + auto password = GetPassword(setting); + if (password) { + secretObjects += fmt::format( + R"( + UPSERT OBJECT {password_secret_name} (TYPE SECRET) WITH value={password}; + )", + "password_secret_name"_a = EncloseAndEscapeString("k2" + name, '`'), + "password"_a = EncloseAndEscapeString(*password, '`')); + } + + return secretObjects ? secretObjects : TMaybe<TString>{}; } -TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth, +TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting, const TString& name, const TSigner::TPtr& signer) { using namespace fmt::literals; - switch (auth.identity_case()) { - case FederatedQuery::IamAuth::kNone: - return R"(, AUTH_METHOD="NONE")"; - case FederatedQuery::IamAuth::kServiceAccount: - return fmt::format(R"(, - AUTH_METHOD="SERVICE_ACCOUNT", - SERVICE_ACCOUNT_ID={service_account_id}, - SERVICE_ACCOUNT_SECRET_NAME={secret_name} - )", - "service_account_id"_a = - EncloseAndEscapeString(auth.service_account().id(), '"'), - "external_source"_a = EncloseAndEscapeString(name, '"'), - "secret_name"_a = - EncloseAndEscapeString(signer ? name : TString{}, '"')); - case FederatedQuery::IamAuth::kCurrentIam: - // Do not replace with default. Adding a new auth item should cause a compilation error - case FederatedQuery::IamAuth::IDENTITY_NOT_SET: + auto authMethod = GetYdbComputeAuthMethod(setting); + switch (authMethod) { + case EYdbComputeAuth::UNKNOWN: return {}; + case EYdbComputeAuth::NONE: + return fmt::format(R"(, AUTH_METHOD="{auth_method}")", "auth_method"_a = ToString(authMethod)); + case EYdbComputeAuth::SERVICE_ACCOUNT: + return fmt::format( + R"(, + AUTH_METHOD="{auth_method}", + SERVICE_ACCOUNT_ID={service_account_id}, + SERVICE_ACCOUNT_SECRET_NAME={sa_secret_name} + )", + "auth_method"_a = ToString(authMethod), + "service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'), + "sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"')); + case EYdbComputeAuth::BASIC: + return fmt::format( + R"(, + AUTH_METHOD="{auth_method}", + LOGIN={login}, + PASSWORD_SECRET_NAME={password_secret_name} + )", + "auth_method"_a = ToString(authMethod), + "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'), + "password_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"')); + case EYdbComputeAuth::MDB_BASIC: + return fmt::format( + R"(, + AUTH_METHOD="{auth_method}", + SERVICE_ACCOUNT_ID="{service_account_id}", + SERVICE_ACCOUNT_SECRET_NAME="{sa_secret_name}", + LOGIN={login}, + PASSWORD_SECRET_NAME={password_secret_name} + )", + "auth_method"_a = ToString(authMethod), + "service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'), + "sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'), + "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'), + "password_secret_name"_a = EncloseAndEscapeString(signer ? "k2" + name : TString{}, '"')); } } @@ -147,44 +173,77 @@ TString MakeCreateExternalDataSourceQuery( const TSigner::TPtr& signer) { using namespace fmt::literals; - auto sourceName = connectionContent.name(); - auto bucketName = connectionContent.setting().object_storage().bucket(); + TString sourceType; + TString locationValue; + TString locationKey; + switch (connectionContent.setting().connection_case()) { + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + case FederatedQuery::ConnectionSetting::kYdbDatabase: + break; + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + locationKey = "MDB_CLUSTER_ID"; + sourceType = "ClickHouse"; + locationValue = EscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"'); + break; + case FederatedQuery::ConnectionSetting::kDataStreams: + break; + case FederatedQuery::ConnectionSetting::kObjectStorage: { + locationKey = "LOCATION"; + auto bucketName = connectionContent.setting().object_storage().bucket(); + locationValue = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/"; + sourceType = "ObjectStorage"; + break; + } + case FederatedQuery::ConnectionSetting::kMonitoring: + break; + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + locationKey = "MDB_CLUSTER_ID"; + locationValue = EscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"'); + sourceType = "PostgreSQL"; + break; + } + auto sourceName = connectionContent.name(); return fmt::format( R"( CREATE EXTERNAL DATA SOURCE {external_source} WITH ( - SOURCE_TYPE="ObjectStorage", - LOCATION="{location}" + SOURCE_TYPE="{source_type}", + {location_key}="{location_value}" {auth_params} ); )", + "source_type"_a = sourceType, "external_source"_a = EncloseAndEscapeString(sourceName, '`'), - "location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/", + "location_key"_a = locationKey, + "location_value"_a = locationValue, "auth_params"_a = - CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(), + CreateAuthParamsQuery(connectionContent.setting(), connectionContent.name(), signer)); } -TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, - const TString& name, - const TSigner::TPtr& signer) { +TMaybe<TString> DropSecretObjectQuery(const TString& name) { using namespace fmt::literals; - switch (auth.identity_case()) { - case FederatedQuery::IamAuth::kServiceAccount: { - if (!signer) { - return {}; - } - return fmt::format("DROP OBJECT {secret_name} (TYPE SECRET);", - "secret_name"_a = - EncloseAndEscapeString(name, '`')); - } - case FederatedQuery::IamAuth::kNone: - case FederatedQuery::IamAuth::kCurrentIam: - // Do not replace with default. Adding a new auth item should cause a compilation error - case FederatedQuery::IamAuth::IDENTITY_NOT_SET: - return {}; - } + return fmt::format( + R"( + DROP OBJECT {secret_name1} (TYPE SECRET); + DROP OBJECT {secret_name2} (TYPE SECRET); + DROP OBJECT {secret_name3} (TYPE SECRET); -- for backward compatibility + )", + "secret_name1"_a = EncloseAndEscapeString("k1" + name, '`'), + "secret_name2"_a = EncloseAndEscapeString("k2" + name, '`'), + "secret_name3"_a = EncloseAndEscapeString(name, '`')); +} + +TString MakeDeleteExternalDataSourceQuery( + const FederatedQuery::ConnectionContent& connectionContent, + const TSigner::TPtr&) { + using namespace fmt::literals; + return fmt::format( + R"( + DROP EXTERNAL DATA SOURCE {external_source}; + )", + "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`')); } TString MakeDeleteExternalDataTableQuery(const TString& tableName) { diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h index 6c4a3ac44ce..ef589356dfe 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h @@ -7,13 +7,11 @@ namespace NFq { namespace NPrivate { -TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, +TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::ConnectionSetting& setting, const TString& name, const TSigner::TPtr& signer); -TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, - const TString& name, - const TSigner::TPtr& signer); +TMaybe<TString> DropSecretObjectQuery(const TString& name); TString MakeCreateExternalDataSourceQuery( const FederatedQuery::ConnectionContent& connectionContent, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index aa93b508b3f..3328e173799 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -477,7 +477,7 @@ NActors::IActor* MakeCreateConnectionActor( auto& connectionContent = request->Get()->Request.content(); auto createSecretStatement = - CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(), + CreateSecretObjectQuery(connectionContent.setting(), connectionContent.name(), signer); @@ -485,10 +485,7 @@ NActors::IActor* MakeCreateConnectionActor( if (createSecretStatement) { statements.push_back( TSchemaQueryTask{.SQL = *createSecretStatement, - .RollbackSQL = DropSecretObjectQuery( - connectionContent.setting().object_storage().auth(), - connectionContent.name(), - signer)}); + .RollbackSQL = DropSecretObjectQuery(connectionContent.name())}); } TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = @@ -556,11 +553,9 @@ NActors::IActor* MakeModifyConnectionActor( auto& newConnectionContent = request->Get()->Request.content(); auto dropOldSecret = - DropSecretObjectQuery(oldConnectionContent.setting().object_storage().auth(), - oldConnectionContent.name(), - signer); + DropSecretObjectQuery(oldConnectionContent.name()); auto createNewSecret = - CreateSecretObjectQuery(newConnectionContent.setting().object_storage().auth(), + CreateSecretObjectQuery(newConnectionContent.setting(), newConnectionContent.name(), signer); std::vector<TSchemaQueryTask> statements; @@ -593,17 +588,14 @@ NActors::IActor* MakeModifyConnectionActor( statements.push_back( TSchemaQueryTask{.SQL = *dropOldSecret, .RollbackSQL = CreateSecretObjectQuery( - oldConnectionContent.setting().object_storage().auth(), + oldConnectionContent.setting(), oldConnectionContent.name(), signer)}); } if (createNewSecret) { statements.push_back( TSchemaQueryTask{.SQL = *createNewSecret, - .RollbackSQL = DropSecretObjectQuery( - newConnectionContent.setting().object_storage().auth(), - newConnectionContent.name(), - signer)}); + .RollbackSQL = DropSecretObjectQuery(newConnectionContent.name())}); } statements.push_back( @@ -666,9 +658,7 @@ NActors::IActor* MakeDeleteConnectionActor( auto& connectionContent = *request->Get()->ConnectionContent; auto dropSecret = - DropSecretObjectQuery(connectionContent.setting().object_storage().auth(), - connectionContent.name(), - signer); + DropSecretObjectQuery(connectionContent.name()); std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{ .SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())}, @@ -679,7 +669,7 @@ NActors::IActor* MakeDeleteConnectionActor( statements.push_back( TSchemaQueryTask{.SQL = *dropSecret, .RollbackSQL = CreateSecretObjectQuery( - connectionContent.setting().object_storage().auth(), + connectionContent.setting(), connectionContent.name(), signer)}); } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index fea916f03a9..faf8fca794d 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -29,39 +29,6 @@ struct TTaskInternal { TString NewTenantName; }; - TString GetServiceAccountId(const FederatedQuery::IamAuth& auth) { - return auth.has_service_account() - ? auth.service_account().id() - : TString{}; - } - - TString ExtractServiceAccountId(const FederatedQuery::Connection& c) { - switch (c.content().setting().connection_case()) { - case FederatedQuery::ConnectionSetting::kYdbDatabase: { - return GetServiceAccountId(c.content().setting().ydb_database().auth()); - } - case FederatedQuery::ConnectionSetting::kDataStreams: { - return GetServiceAccountId(c.content().setting().data_streams().auth()); - } - case FederatedQuery::ConnectionSetting::kObjectStorage: { - return GetServiceAccountId(c.content().setting().object_storage().auth()); - } - case FederatedQuery::ConnectionSetting::kMonitoring: { - return GetServiceAccountId(c.content().setting().monitoring().auth()); - } - case FederatedQuery::ConnectionSetting::kClickhouseCluster: { - return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth()); - } - case FederatedQuery::ConnectionSetting::kPostgresqlCluster: { - return GetServiceAccountId(c.content().setting().postgresql_cluster().auth()); - } - // Do not replace with default. Adding a new connection should cause a compilation error - case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: - break; - } - return {}; - } - std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) { if (taskInternal.ShouldSkipTask) { |