summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <[email protected]>2023-09-01 15:38:30 +0300
committerhcpp <[email protected]>2023-09-01 16:43:47 +0300
commit11c90944b3271b1433532bbb339753d62b15d324 (patch)
tree4410962e52ef1ef8ab2baad21189f5f3bf8003f3
parenta48fac51f94eeab5d65d801d2b9eed3788746585 (diff)
pg and ch fq proxy
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/common/util.cpp147
-rw-r--r--ydb/core/fq/libs/common/util.h23
-rw-r--r--ydb/core/fq/libs/common/ya.make2
-rw-r--r--ydb/core/fq/libs/compute/common/config.h4
-rw-r--r--ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp23
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp183
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h6
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp26
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp33
13 files changed, 354 insertions, 141 deletions
diff --git a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
index af28035aad6..3f433561b01 100644
--- a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
@@ -7,6 +7,12 @@
add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(fq-libs-common)
target_compile_options(fq-libs-common PRIVATE
@@ -15,6 +21,7 @@ target_compile_options(fq-libs-common PRIVATE
target_link_libraries(fq-libs-common PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
library-cpp-blockcodecs
libs-control_plane_storage-events
fq-libs-events
@@ -29,3 +36,8 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
+generate_enum_serilization(fq-libs-common
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/common/util.h
+)
diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
index eba7bc83ba7..008f49085bc 100644
--- a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
@@ -7,6 +7,12 @@
add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(fq-libs-common)
target_compile_options(fq-libs-common PRIVATE
@@ -16,6 +22,7 @@ target_link_libraries(fq-libs-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
library-cpp-blockcodecs
libs-control_plane_storage-events
fq-libs-events
@@ -30,3 +37,8 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
+generate_enum_serilization(fq-libs-common
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/common/util.h
+)
diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
index eba7bc83ba7..008f49085bc 100644
--- a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
@@ -7,6 +7,12 @@
add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(fq-libs-common)
target_compile_options(fq-libs-common PRIVATE
@@ -16,6 +22,7 @@ target_link_libraries(fq-libs-common PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
library-cpp-blockcodecs
libs-control_plane_storage-events
fq-libs-events
@@ -30,3 +37,8 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
+generate_enum_serilization(fq-libs-common
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/common/util.h
+)
diff --git a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
index af28035aad6..3f433561b01 100644
--- a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
@@ -7,6 +7,12 @@
add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(fq-libs-common)
target_compile_options(fq-libs-common PRIVATE
@@ -15,6 +21,7 @@ target_compile_options(fq-libs-common PRIVATE
target_link_libraries(fq-libs-common PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
library-cpp-blockcodecs
libs-control_plane_storage-events
fq-libs-events
@@ -29,3 +36,8 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
+generate_enum_serilization(fq-libs-common
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/common/util.h
+)
diff --git a/ydb/core/fq/libs/common/util.cpp b/ydb/core/fq/libs/common/util.cpp
index 053de088890..32462a4af95 100644
--- a/ydb/core/fq/libs/common/util.cpp
+++ b/ydb/core/fq/libs/common/util.cpp
@@ -6,6 +6,42 @@
namespace NFq {
+namespace {
+
+TString GetServiceAccountId(const FederatedQuery::IamAuth& auth) {
+ return auth.has_service_account()
+ ? auth.service_account().id()
+ : TString{};
+}
+
+EYdbComputeAuth GetIamAuthMethod(const FederatedQuery::IamAuth& auth) {
+ switch (auth.identity_case()) {
+ case FederatedQuery::IamAuth::kNone:
+ return EYdbComputeAuth::NONE;
+ case FederatedQuery::IamAuth::kServiceAccount:
+ return EYdbComputeAuth::SERVICE_ACCOUNT;
+ case FederatedQuery::IamAuth::kCurrentIam:
+ // Do not replace with default. Adding a new auth item should cause a compilation error
+ case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ return EYdbComputeAuth::UNKNOWN;
+ }
+}
+
+EYdbComputeAuth GetBasicAuthMethod(const FederatedQuery::IamAuth& auth) {
+ switch (auth.identity_case()) {
+ case FederatedQuery::IamAuth::kNone:
+ return EYdbComputeAuth::BASIC;
+ case FederatedQuery::IamAuth::kServiceAccount:
+ return EYdbComputeAuth::MDB_BASIC;
+ case FederatedQuery::IamAuth::kCurrentIam:
+ // Do not replace with default. Adding a new auth item should cause a compilation error
+ case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ return EYdbComputeAuth::UNKNOWN;
+ }
+}
+
+}
+
TString EscapeString(const TString& value,
const TString& enclosingSeq,
const TString& replaceWith) {
@@ -35,4 +71,115 @@ TString EncloseAndEscapeString(const TString& value,
<< enclosingSeq;
}
+TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting) {
+ switch (setting.connection_case()) {
+ case FederatedQuery::ConnectionSetting::kYdbDatabase: {
+ return GetServiceAccountId(setting.ydb_database().auth());
+ }
+ case FederatedQuery::ConnectionSetting::kDataStreams: {
+ return GetServiceAccountId(setting.data_streams().auth());
+ }
+ case FederatedQuery::ConnectionSetting::kObjectStorage: {
+ return GetServiceAccountId(setting.object_storage().auth());
+ }
+ case FederatedQuery::ConnectionSetting::kMonitoring: {
+ return GetServiceAccountId(setting.monitoring().auth());
+ }
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
+ return GetServiceAccountId(setting.clickhouse_cluster().auth());
+ }
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
+ return GetServiceAccountId(setting.postgresql_cluster().auth());
+ }
+ // Do not replace with default. Adding a new connection should cause a compilation error
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ break;
+ }
+ return {};
+}
+
+TString ExtractServiceAccountId(const FederatedQuery::ConnectionContent& content) {
+ return ExtractServiceAccountId(content.setting());
+}
+
+TString ExtractServiceAccountId(const FederatedQuery::Connection& connection) {
+ return ExtractServiceAccountId(connection.content());
+}
+
+TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
+ switch (setting.connection_case()) {
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ return {};
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ return {};
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ return setting.clickhouse_cluster().login();
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ return {};
+ case FederatedQuery::ConnectionSetting::kObjectStorage:
+ return {};
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ return {};
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ return setting.postgresql_cluster().login();
+ }
+}
+
+TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
+ switch (setting.connection_case()) {
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ return {};
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ return {};
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ return setting.clickhouse_cluster().password();
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ return {};
+ case FederatedQuery::ConnectionSetting::kObjectStorage:
+ return {};
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ return {};
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ return setting.postgresql_cluster().password();
+ }
+}
+
+EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting& setting) {
+ switch (setting.connection_case()) {
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ return EYdbComputeAuth::UNKNOWN;
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ return GetIamAuthMethod(setting.ydb_database().auth());
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ return GetBasicAuthMethod(setting.clickhouse_cluster().auth());
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ return GetIamAuthMethod(setting.data_streams().auth());
+ case FederatedQuery::ConnectionSetting::kObjectStorage:
+ return GetIamAuthMethod(setting.object_storage().auth());
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ return GetIamAuthMethod(setting.monitoring().auth());
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ return GetBasicAuthMethod(setting.postgresql_cluster().auth());
+ }
+}
+
+FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
+ switch (connection.content().setting().connection_case()) {
+ case FederatedQuery::ConnectionSetting::kObjectStorage:
+ return connection.content().setting().object_storage().auth();
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ return connection.content().setting().ydb_database().auth();
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ return connection.content().setting().clickhouse_cluster().auth();
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ return connection.content().setting().data_streams().auth();
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ return connection.content().setting().monitoring().auth();
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ return connection.content().setting().postgresql_cluster().auth();
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ return FederatedQuery::IamAuth{};
+ }
+}
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h
index b7aedf6e6c4..41a78006887 100644
--- a/ydb/core/fq/libs/common/util.h
+++ b/ydb/core/fq/libs/common/util.h
@@ -4,6 +4,7 @@
#include <array>
#include <google/protobuf/repeated_field.h>
+#include <ydb/public/api/protos/draft/fq.pb.h>
#include <library/cpp/iterator/mapped.h>
#include <util/generic/string.h>
@@ -12,6 +13,14 @@
namespace NFq {
+enum class EYdbComputeAuth {
+ UNKNOWN,
+ NONE,
+ SERVICE_ACCOUNT,
+ BASIC,
+ MDB_BASIC
+};
+
template<std::size_t K, typename T, std::size_t N>
auto CreateArray(const T(&list)[N]) -> std::array<T, K> {
static_assert(N == K, "not valid array size");
@@ -46,4 +55,18 @@ TString EncloseAndEscapeString(const TString& value,
const TString& enclosingSeq,
const TString& replaceWith);
+TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting);
+
+TString ExtractServiceAccountId(const FederatedQuery::ConnectionContent& content);
+
+TString ExtractServiceAccountId(const FederatedQuery::Connection& connection);
+
+TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting);
+
+TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting);
+
+EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting& setting);
+
+FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection);
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/common/ya.make b/ydb/core/fq/libs/common/ya.make
index 3246b6455ec..77aef7dd380 100644
--- a/ydb/core/fq/libs/common/ya.make
+++ b/ydb/core/fq/libs/common/ya.make
@@ -1,5 +1,7 @@
LIBRARY()
+GENERATE_ENUM_SERIALIZATION(util.h)
+
SRCS(
cache.h
compression.cpp
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
index 974d53e15d4..8a43d6a8208 100644
--- a/ydb/core/fq/libs/compute/common/config.h
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -144,12 +144,12 @@ public:
const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const {
switch (connectionCase) {
case FederatedQuery::ConnectionSetting::kObjectStorage:
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return true;
case FederatedQuery::ConnectionSetting::kYdbDatabase:
- case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kDataStreams:
case FederatedQuery::ConnectionSetting::kMonitoring:
- case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
return false;
}
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
index 5524f1921ce..dbfb7761486 100644
--- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
@@ -1,6 +1,6 @@
#include "synchronization_service.h"
-
+#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/compute/common/config.h>
#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
@@ -22,7 +22,6 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
-
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream)
#define LOG_W(stream) LOG_WARN_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream)
#define LOG_I(stream) LOG_INFO_S( *TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] [SynchronizationService]: " << stream)
@@ -288,32 +287,12 @@ private:
return {};
}
- FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
- switch (connection.content().setting().connection_case()) {
- case FederatedQuery::ConnectionSetting::kObjectStorage:
- return connection.content().setting().object_storage().auth();
- case FederatedQuery::ConnectionSetting::kYdbDatabase:
- return connection.content().setting().ydb_database().auth();
- case FederatedQuery::ConnectionSetting::kClickhouseCluster:
- return connection.content().setting().clickhouse_cluster().auth();
- case FederatedQuery::ConnectionSetting::kDataStreams:
- return connection.content().setting().data_streams().auth();
- case FederatedQuery::ConnectionSetting::kMonitoring:
- return connection.content().setting().monitoring().auth();
- case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
- return connection.content().setting().postgresql_cluster().auth();
- case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
- return FederatedQuery::IamAuth{};
- }
- }
-
void ExcludeUnsupportedExternalDataSources() {
TVector<TString> excludeIds;
for (const auto& [_, connection]: Connections) {
const auto& meta = connection.meta();
const auto& content = connection.content();
const auto& setting = content.setting();
-
if (!ComputeConfig.IsConnectionCaseEnabled(setting.connection_case())) {
LOG_I("Exclude connection by type: scope = " << Scope << " , id = " << meta.id() << ", type = " << static_cast<int>(setting.connection_case()));
excludeIds.push_back(meta.id());
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
index c28ea7732a7..6dab875b23a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -92,52 +92,78 @@ TString SignAccountId(const TString& id, const TSigner::TPtr& signer) {
return signer ? signer->SignAccountId(id) : TString{};
}
-TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
- const TString& name,
- const TSigner::TPtr& signer) {
+TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::ConnectionSetting& setting,
+ const TString& name,
+ const TSigner::TPtr& signer) {
using namespace fmt::literals;
- switch (auth.identity_case()) {
- case FederatedQuery::IamAuth::kServiceAccount: {
- if (!signer) {
- return {};
- }
- return fmt::format(R"(
- UPSERT OBJECT {external_source} (TYPE SECRET) WITH value={signature};
- )",
- "external_source"_a = EncloseAndEscapeString(name, '`'),
- "signature"_a = EncloseAndEscapeString(
- SignAccountId(auth.service_account().id(), signer), '`'));
- }
- case FederatedQuery::IamAuth::kNone:
- case FederatedQuery::IamAuth::kCurrentIam:
- // Do not replace with default. Adding a new auth item should cause a compilation error
- case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
- return {};
+ TString secretObjects;
+ auto serviceAccountId = ExtractServiceAccountId(setting);
+ if (serviceAccountId) {
+ secretObjects = signer ? fmt::format(
+ R"(
+ UPSERT OBJECT {sa_secret_name} (TYPE SECRET) WITH value={signature};
+ )",
+ "sa_secret_name"_a = EncloseAndEscapeString("k1" + name, '`'),
+ "signature"_a = EncloseAndEscapeString(SignAccountId(serviceAccountId, signer), '`')) : std::string{};
}
+
+ auto password = GetPassword(setting);
+ if (password) {
+ secretObjects += fmt::format(
+ R"(
+ UPSERT OBJECT {password_secret_name} (TYPE SECRET) WITH value={password};
+ )",
+ "password_secret_name"_a = EncloseAndEscapeString("k2" + name, '`'),
+ "password"_a = EncloseAndEscapeString(*password, '`'));
+ }
+
+ return secretObjects ? secretObjects : TMaybe<TString>{};
}
-TString CreateAuthParamsQuery(const FederatedQuery::IamAuth& auth,
+TString CreateAuthParamsQuery(const FederatedQuery::ConnectionSetting& setting,
const TString& name,
const TSigner::TPtr& signer) {
using namespace fmt::literals;
- switch (auth.identity_case()) {
- case FederatedQuery::IamAuth::kNone:
- return R"(, AUTH_METHOD="NONE")";
- case FederatedQuery::IamAuth::kServiceAccount:
- return fmt::format(R"(,
- AUTH_METHOD="SERVICE_ACCOUNT",
- SERVICE_ACCOUNT_ID={service_account_id},
- SERVICE_ACCOUNT_SECRET_NAME={secret_name}
- )",
- "service_account_id"_a =
- EncloseAndEscapeString(auth.service_account().id(), '"'),
- "external_source"_a = EncloseAndEscapeString(name, '"'),
- "secret_name"_a =
- EncloseAndEscapeString(signer ? name : TString{}, '"'));
- case FederatedQuery::IamAuth::kCurrentIam:
- // Do not replace with default. Adding a new auth item should cause a compilation error
- case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
+ auto authMethod = GetYdbComputeAuthMethod(setting);
+ switch (authMethod) {
+ case EYdbComputeAuth::UNKNOWN:
return {};
+ case EYdbComputeAuth::NONE:
+ return fmt::format(R"(, AUTH_METHOD="{auth_method}")", "auth_method"_a = ToString(authMethod));
+ case EYdbComputeAuth::SERVICE_ACCOUNT:
+ return fmt::format(
+ R"(,
+ AUTH_METHOD="{auth_method}",
+ SERVICE_ACCOUNT_ID={service_account_id},
+ SERVICE_ACCOUNT_SECRET_NAME={sa_secret_name}
+ )",
+ "auth_method"_a = ToString(authMethod),
+ "service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'),
+ "sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'));
+ case EYdbComputeAuth::BASIC:
+ return fmt::format(
+ R"(,
+ AUTH_METHOD="{auth_method}",
+ LOGIN={login},
+ PASSWORD_SECRET_NAME={password_secret_name}
+ )",
+ "auth_method"_a = ToString(authMethod),
+ "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'),
+ "password_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'));
+ case EYdbComputeAuth::MDB_BASIC:
+ return fmt::format(
+ R"(,
+ AUTH_METHOD="{auth_method}",
+ SERVICE_ACCOUNT_ID="{service_account_id}",
+ SERVICE_ACCOUNT_SECRET_NAME="{sa_secret_name}",
+ LOGIN={login},
+ PASSWORD_SECRET_NAME={password_secret_name}
+ )",
+ "auth_method"_a = ToString(authMethod),
+ "service_account_id"_a = EncloseAndEscapeString(ExtractServiceAccountId(setting), '"'),
+ "sa_secret_name"_a = EncloseAndEscapeString(signer ? "k1" + name : TString{}, '"'),
+ "login"_a = EncloseAndEscapeString(GetLogin(setting).GetOrElse({}), '"'),
+ "password_secret_name"_a = EncloseAndEscapeString(signer ? "k2" + name : TString{}, '"'));
}
}
@@ -147,44 +173,77 @@ TString MakeCreateExternalDataSourceQuery(
const TSigner::TPtr& signer) {
using namespace fmt::literals;
- auto sourceName = connectionContent.name();
- auto bucketName = connectionContent.setting().object_storage().bucket();
+ TString sourceType;
+ TString locationValue;
+ TString locationKey;
+ switch (connectionContent.setting().connection_case()) {
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ break;
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ locationKey = "MDB_CLUSTER_ID";
+ sourceType = "ClickHouse";
+ locationValue = EscapeString(connectionContent.setting().clickhouse_cluster().database_id(), '"');
+ break;
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ break;
+ case FederatedQuery::ConnectionSetting::kObjectStorage: {
+ locationKey = "LOCATION";
+ auto bucketName = connectionContent.setting().object_storage().bucket();
+ locationValue = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/";
+ sourceType = "ObjectStorage";
+ break;
+ }
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ break;
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ locationKey = "MDB_CLUSTER_ID";
+ locationValue = EscapeString(connectionContent.setting().postgresql_cluster().database_id(), '"');
+ sourceType = "PostgreSQL";
+ break;
+ }
+ auto sourceName = connectionContent.name();
return fmt::format(
R"(
CREATE EXTERNAL DATA SOURCE {external_source} WITH (
- SOURCE_TYPE="ObjectStorage",
- LOCATION="{location}"
+ SOURCE_TYPE="{source_type}",
+ {location_key}="{location_value}"
{auth_params}
);
)",
+ "source_type"_a = sourceType,
"external_source"_a = EncloseAndEscapeString(sourceName, '`'),
- "location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/",
+ "location_key"_a = locationKey,
+ "location_value"_a = locationValue,
"auth_params"_a =
- CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(),
+ CreateAuthParamsQuery(connectionContent.setting(),
connectionContent.name(),
signer));
}
-TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
- const TString& name,
- const TSigner::TPtr& signer) {
+TMaybe<TString> DropSecretObjectQuery(const TString& name) {
using namespace fmt::literals;
- switch (auth.identity_case()) {
- case FederatedQuery::IamAuth::kServiceAccount: {
- if (!signer) {
- return {};
- }
- return fmt::format("DROP OBJECT {secret_name} (TYPE SECRET);",
- "secret_name"_a =
- EncloseAndEscapeString(name, '`'));
- }
- case FederatedQuery::IamAuth::kNone:
- case FederatedQuery::IamAuth::kCurrentIam:
- // Do not replace with default. Adding a new auth item should cause a compilation error
- case FederatedQuery::IamAuth::IDENTITY_NOT_SET:
- return {};
- }
+ return fmt::format(
+ R"(
+ DROP OBJECT {secret_name1} (TYPE SECRET);
+ DROP OBJECT {secret_name2} (TYPE SECRET);
+ DROP OBJECT {secret_name3} (TYPE SECRET); -- for backward compatibility
+ )",
+ "secret_name1"_a = EncloseAndEscapeString("k1" + name, '`'),
+ "secret_name2"_a = EncloseAndEscapeString("k2" + name, '`'),
+ "secret_name3"_a = EncloseAndEscapeString(name, '`'));
+}
+
+TString MakeDeleteExternalDataSourceQuery(
+ const FederatedQuery::ConnectionContent& connectionContent,
+ const TSigner::TPtr&) {
+ using namespace fmt::literals;
+ return fmt::format(
+ R"(
+ DROP EXTERNAL DATA SOURCE {external_source};
+ )",
+ "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`'));
}
TString MakeDeleteExternalDataTableQuery(const TString& tableName) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
index 6c4a3ac44ce..ef589356dfe 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
@@ -7,13 +7,11 @@
namespace NFq {
namespace NPrivate {
-TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::ConnectionSetting& setting,
const TString& name,
const TSigner::TPtr& signer);
-TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
- const TString& name,
- const TSigner::TPtr& signer);
+TMaybe<TString> DropSecretObjectQuery(const TString& name);
TString MakeCreateExternalDataSourceQuery(
const FederatedQuery::ConnectionContent& connectionContent,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index aa93b508b3f..3328e173799 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -477,7 +477,7 @@ NActors::IActor* MakeCreateConnectionActor(
auto& connectionContent = request->Get()->Request.content();
auto createSecretStatement =
- CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(),
+ CreateSecretObjectQuery(connectionContent.setting(),
connectionContent.name(),
signer);
@@ -485,10 +485,7 @@ NActors::IActor* MakeCreateConnectionActor(
if (createSecretStatement) {
statements.push_back(
TSchemaQueryTask{.SQL = *createSecretStatement,
- .RollbackSQL = DropSecretObjectQuery(
- connectionContent.setting().object_storage().auth(),
- connectionContent.name(),
- signer)});
+ .RollbackSQL = DropSecretObjectQuery(connectionContent.name())});
}
TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
@@ -556,11 +553,9 @@ NActors::IActor* MakeModifyConnectionActor(
auto& newConnectionContent = request->Get()->Request.content();
auto dropOldSecret =
- DropSecretObjectQuery(oldConnectionContent.setting().object_storage().auth(),
- oldConnectionContent.name(),
- signer);
+ DropSecretObjectQuery(oldConnectionContent.name());
auto createNewSecret =
- CreateSecretObjectQuery(newConnectionContent.setting().object_storage().auth(),
+ CreateSecretObjectQuery(newConnectionContent.setting(),
newConnectionContent.name(),
signer);
std::vector<TSchemaQueryTask> statements;
@@ -593,17 +588,14 @@ NActors::IActor* MakeModifyConnectionActor(
statements.push_back(
TSchemaQueryTask{.SQL = *dropOldSecret,
.RollbackSQL = CreateSecretObjectQuery(
- oldConnectionContent.setting().object_storage().auth(),
+ oldConnectionContent.setting(),
oldConnectionContent.name(),
signer)});
}
if (createNewSecret) {
statements.push_back(
TSchemaQueryTask{.SQL = *createNewSecret,
- .RollbackSQL = DropSecretObjectQuery(
- newConnectionContent.setting().object_storage().auth(),
- newConnectionContent.name(),
- signer)});
+ .RollbackSQL = DropSecretObjectQuery(newConnectionContent.name())});
}
statements.push_back(
@@ -666,9 +658,7 @@ NActors::IActor* MakeDeleteConnectionActor(
auto& connectionContent = *request->Get()->ConnectionContent;
auto dropSecret =
- DropSecretObjectQuery(connectionContent.setting().object_storage().auth(),
- connectionContent.name(),
- signer);
+ DropSecretObjectQuery(connectionContent.name());
std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{
.SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())},
@@ -679,7 +669,7 @@ NActors::IActor* MakeDeleteConnectionActor(
statements.push_back(
TSchemaQueryTask{.SQL = *dropSecret,
.RollbackSQL = CreateSecretObjectQuery(
- connectionContent.setting().object_storage().auth(),
+ connectionContent.setting(),
connectionContent.name(),
signer)});
}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
index fea916f03a9..faf8fca794d 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp
@@ -29,39 +29,6 @@ struct TTaskInternal {
TString NewTenantName;
};
- TString GetServiceAccountId(const FederatedQuery::IamAuth& auth) {
- return auth.has_service_account()
- ? auth.service_account().id()
- : TString{};
- }
-
- TString ExtractServiceAccountId(const FederatedQuery::Connection& c) {
- switch (c.content().setting().connection_case()) {
- case FederatedQuery::ConnectionSetting::kYdbDatabase: {
- return GetServiceAccountId(c.content().setting().ydb_database().auth());
- }
- case FederatedQuery::ConnectionSetting::kDataStreams: {
- return GetServiceAccountId(c.content().setting().data_streams().auth());
- }
- case FederatedQuery::ConnectionSetting::kObjectStorage: {
- return GetServiceAccountId(c.content().setting().object_storage().auth());
- }
- case FederatedQuery::ConnectionSetting::kMonitoring: {
- return GetServiceAccountId(c.content().setting().monitoring().auth());
- }
- case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
- return GetServiceAccountId(c.content().setting().clickhouse_cluster().auth());
- }
- case FederatedQuery::ConnectionSetting::kPostgresqlCluster: {
- return GetServiceAccountId(c.content().setting().postgresql_cluster().auth());
- }
- // Do not replace with default. Adding a new connection should cause a compilation error
- case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
- break;
- }
- return {};
- }
-
std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) {
if (taskInternal.ShouldSkipTask) {