diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-08-22 19:01:00 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-08-22 19:22:23 +0300 |
commit | a42171b3cf1fa15a4276d31465f2b8621c9fbc86 (patch) | |
tree | 2ef8e27db4a81640c610bdd77cdc64034a07a945 | |
parent | 769d14120ef8e30363c7dd6870ce1b82552587c3 (diff) | |
download | ydb-a42171b3cf1fa15a4276d31465f2b8621c9fbc86.tar.gz |
Green yqv2-dev SLI
Initial implementation
13 files changed, 519 insertions, 221 deletions
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index a332189cb21..b0616b1bff0 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -5,7 +5,6 @@ #include <util/digest/multi.h> #include <util/generic/algorithm.h> -#include <util/generic/algorithm.h> #include <util/generic/vector.h> #include <util/generic/yexception.h> @@ -103,10 +102,32 @@ public: NFq::NConfig::EComputeType::YDB); } + bool IsYDBSchemaOperationsEnabled( + const TString& scope, + const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const { + return IsConnectionCaseEnabled(connectionCase) && + YdbComputeControlPlaneEnabled(scope); + } + const NFq::NConfig::TComputeConfig& GetProto() const { return ComputeConfig; } + bool IsConnectionCaseEnabled( + const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const { + switch (connectionCase) { + case FederatedQuery::ConnectionSetting::kObjectStorage: + return true; + case FederatedQuery::ConnectionSetting::kYdbDatabase: + case FederatedQuery::ConnectionSetting::kClickhouseCluster: + case FederatedQuery::ConnectionSetting::kDataStreams: + case FederatedQuery::ConnectionSetting::kMonitoring: + case FederatedQuery::ConnectionSetting::kPostgresqlCluster: + case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + return false; + } + } + private: NFq::NConfig::TComputeConfig ComputeConfig; NFq::NConfig::EComputeType DefaultCompute; diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp index 3480c506d9b..be2c5c3ea43 100644 --- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp @@ -313,18 +313,10 @@ private: const auto& meta = connection.meta(); const auto& content = connection.content(); const auto& setting = content.setting(); - switch (setting.connection_case()) { - case FederatedQuery::ConnectionSetting::kObjectStorage: - break; - case FederatedQuery::ConnectionSetting::kYdbDatabase: - case FederatedQuery::ConnectionSetting::kClickhouseCluster: - case FederatedQuery::ConnectionSetting::kDataStreams: - case FederatedQuery::ConnectionSetting::kMonitoring: - case FederatedQuery::ConnectionSetting::kPostgresqlCluster: - case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: + + if (!ComputeConfig.IsConnectionCaseEnabled(setting.connection_case())) { LOG_I("Exclude connection by type: scope = " << Scope << " , id = " << meta.id() << ", type = " << static_cast<int>(setting.connection_case())); excludeIds.push_back(meta.id()); - break; } switch (content.acl().visibility()) { diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp index 69adc07df2a..d9666ad965c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp @@ -44,28 +44,37 @@ private: using TEventRequestPtr = typename TEventRequest::TPtr; public: - using TCPSRequestFactory = + using TCPSProtoRequestFactory = std::function<typename TCPSEventRequest::TProto(const TEventRequestPtr& request)>; - using TErrorMessageFactoryMethod = std::function<TString(const NYql::TIssues& issues)>; - using TEntityNameExtractorFactoryMethod = + using TErrorMessageFactory = std::function<TString(const NYql::TIssues& issues)>; + using TCPSEventRequestPostProcessor = std::function<void(TCPSEventRequest& eventRequest)>; + using TResultHandler = std::function<void(const TEventRequestPtr& request, const typename TCPSEventResponse::TProto& response)>; - - TControlPlaneStorageRequesterActor(const TActorId& proxyActorId, - const TEventRequestPtr request, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters, - TPermissions permissions, - TCPSRequestFactory cpsRequestFactory, - TErrorMessageFactoryMethod errorMessageFactoryMethod, - TEntityNameExtractorFactoryMethod entityNameExtractorFactoryMethod) - : TBaseActor< - TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>( + using TIssuesHandler = std::function<void(const TEventRequestPtr& request, + const typename NYql::TIssues& issues)>; + + TControlPlaneStorageRequesterActor( + const TActorId& proxyActorId, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + TPermissions permissions, + TCPSProtoRequestFactory cpsProtoRequestFactory, + TErrorMessageFactory errorMessageFactory, + TResultHandler resultHandler, + TCPSEventRequestPostProcessor cpsEventRequestPostProcessor = + std::function([](TCPSEventRequest& a) { Y_UNUSED(a); })) + : TBaseActor<TControlPlaneStorageRequesterActor<TEventRequest, + TEventResponse, + TCPSEventRequest, + TCPSEventResponse>>( proxyActorId, std::move(request), requestTimeout, counters) , Permissions(permissions) - , CPSRequestFactory(cpsRequestFactory) - , ErrorMessageFactoryMethod(errorMessageFactoryMethod) - , EntityNameExtractorFactoryMethod(entityNameExtractorFactoryMethod) { } + , CPSProtoRequestFactory(cpsProtoRequestFactory) + , CPSEventRequestPostProcessor(cpsEventRequestPostProcessor) + , ErrorMessageFactory(errorMessageFactory) + , ResultHandler(resultHandler) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_CONTROL_PLANE_STORAGE"; @@ -77,7 +86,7 @@ public: CPP_LOG_I("TControlPlaneStorageRequesterActor Sending CPS request. Actor id: " << TBase::SelfId()); const auto& request = Request; auto event = new TCPSEventRequest(request->Get()->Scope, - CPSRequestFactory(request), + CPSProtoRequestFactory(request), request->Get()->User, request->Get()->Token, request->Get()->CloudId, @@ -85,6 +94,7 @@ public: request->Get()->Quotas, request->Get()->TenantInfo, {}); + CPSEventRequestPostProcessor(*event); Send(ControlPlaneStorageServiceActorId(), event); } @@ -98,21 +108,22 @@ public: auto issues = event->Get()->Issues; if (!issues.Empty()) { CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished with issues. Actor id: " << TBase::SelfId()); - TString errorMessage = ErrorMessageFactoryMethod(issues); + TString errorMessage = ErrorMessageFactory(issues); TBase::HandleError(errorMessage, issues); return; } CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished successfully. Actor id: " << TBase::SelfId()); - EntityNameExtractorFactoryMethod(Request, event->Get()->Result); + ResultHandler(Request, event->Get()->Result); TBase::SendRequestToSender(); } private: TPermissions Permissions; - TCPSRequestFactory CPSRequestFactory; - TErrorMessageFactoryMethod ErrorMessageFactoryMethod; - TEntityNameExtractorFactoryMethod EntityNameExtractorFactoryMethod; + TCPSProtoRequestFactory CPSProtoRequestFactory; + TCPSEventRequestPostProcessor CPSEventRequestPostProcessor; + TErrorMessageFactory ErrorMessageFactory; + TResultHandler ResultHandler; }; /// Discover connection_name @@ -120,7 +131,49 @@ TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues) Y_UNUSED(issues); return "Couldn't resolve connection"; }; -NActors::IActor* MakeDiscoverYDBConnectionName( + +NActors::IActor* MakeListConnectionActor( + const TActorId& proxyActorId, + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event) { + FederatedQuery::ListConnectionsRequest result; + auto connectionName = event->Get()->Request.content().name(); + result.mutable_filter()->set_name(connectionName); + result.set_limit(2); + return result; + }; + auto cpsRequestPostProcessor = + [](TEvControlPlaneStorage::TEvListConnectionsRequest& event) { + event.IsExactNameMatch = true; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event, + const FederatedQuery::ListConnectionsResult& result) { + event->Get()->CPSListingFinished = true; + event->Get()->CPSConnectionCount = result.connection_size(); + }; + + return new TControlPlaneStorageRequesterActor< + TEvControlPlaneProxy::TEvCreateConnectionRequest, + TEvControlPlaneProxy::TEvCreateConnectionResponse, + TEvControlPlaneStorage::TEvListConnectionsRequest, + TEvControlPlaneStorage::TEvListConnectionsResponse>( + proxyActorId, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + DescribeConnectionErrorMessageFactoryMethod, + entityNameExtractorFactoryMethod, + cpsRequestPostProcessor); +} + +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, TCounters& counters, @@ -153,7 +206,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, @@ -186,7 +239,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request, TCounters& counters, @@ -219,7 +272,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, TCounters& counters, @@ -254,7 +307,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName( /// Discover binding_name -NActors::IActor* MakeDiscoverYDBBindingName( +NActors::IActor* MakeDiscoverYDBBindingNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, TCounters& counters, @@ -291,7 +344,7 @@ NActors::IActor* MakeDiscoverYDBBindingName( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeDiscoverYDBBindingName( +NActors::IActor* MakeDiscoverYDBBindingNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request, TCounters& counters, @@ -328,7 +381,7 @@ NActors::IActor* MakeDiscoverYDBBindingName( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeListBindingIds( +NActors::IActor* MakeListBindingIdsActor( const TActorId proxyActorId, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, @@ -381,7 +434,7 @@ NActors::IActor* MakeListBindingIds( entityNameExtractorFactoryMethod); } -NActors::IActor* MakeDescribeListedBinding( +NActors::IActor* MakeDescribeListedBindingActor( const TActorId proxyActorId, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h index 38b6557e141..60c89e9f95c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h @@ -11,44 +11,51 @@ namespace NPrivate { using namespace NActors; /// Discover connection_name -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeDiscoverYDBConnectionName( +NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); +NActors::IActor* MakeListConnectionActor( + const TActorId& proxyActorId, + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + /// Discover binding_name -NActors::IActor* MakeDiscoverYDBBindingName( +NActors::IActor* MakeDiscoverYDBBindingNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeDiscoverYDBBindingName( +NActors::IActor* MakeDiscoverYDBBindingNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request, TCounters& counters, @@ -57,14 +64,14 @@ NActors::IActor* MakeDiscoverYDBBindingName( /// ModifyConnection -NActors::IActor* MakeListBindingIds( +NActors::IActor* MakeListBindingIdsActor( const TActorId sender, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeDescribeListedBinding( +NActors::IActor* MakeDescribeListedBindingActor( const TActorId sender, const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, TCounters& counters, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 494e44034aa..c28ea7732a7 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -92,9 +92,9 @@ TString SignAccountId(const TString& id, const TSigner::TPtr& signer) { return signer ? signer->SignAccountId(id) : TString{}; } -TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, - const TString& name, - const TSigner::TPtr& signer) { +TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer) { using namespace fmt::literals; switch (auth.identity_case()) { case FederatedQuery::IamAuth::kServiceAccount: { @@ -152,7 +152,6 @@ TString MakeCreateExternalDataSourceQuery( return fmt::format( R"( - {upsert_object}; CREATE EXTERNAL DATA SOURCE {external_source} WITH ( SOURCE_TYPE="ObjectStorage", LOCATION="{location}" @@ -161,19 +160,15 @@ TString MakeCreateExternalDataSourceQuery( )", "external_source"_a = EncloseAndEscapeString(sourceName, '`'), "location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/", - "upsert_object"_a = - CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(), - connectionContent.name(), - signer), "auth_params"_a = CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(), connectionContent.name(), signer)); } -TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, - const TString& name, - const TSigner::TPtr& signer) { +TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer) { using namespace fmt::literals; switch (auth.identity_case()) { case FederatedQuery::IamAuth::kServiceAccount: { @@ -192,27 +187,17 @@ TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, } } -TString MakeDeleteExternalDataSourceQuery( - const FederatedQuery::ConnectionContent& connectionContent, - const TSigner::TPtr& signer) { - using namespace fmt::literals; - return fmt::format( - R"( - {drop_secret_statement}; - DROP EXTERNAL DATA SOURCE {external_source}; - )", - "drop_secret_statement"_a = - DropSecretObjectQuery(connectionContent.setting().object_storage().auth(), - connectionContent.name(), - signer), - "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`')); -} - TString MakeDeleteExternalDataTableQuery(const TString& tableName) { using namespace fmt::literals; return fmt::format("DROP EXTERNAL TABLE {external_table};", "external_table"_a = EncloseAndEscapeString(tableName, '`')); } +TString MakeDeleteExternalDataSourceQuery(const TString& sourceName) { + using namespace fmt::literals; + return fmt::format("DROP EXTERNAL DATA SOURCE {external_source};", + "external_source"_a = EncloseAndEscapeString(sourceName, '`')); +} + } // namespace NPrivate } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h index 480ee99ee7f..6c4a3ac44ce 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h @@ -7,17 +7,23 @@ namespace NFq { namespace NPrivate { -TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, - const TString& connectionName); +TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer); + +TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth, + const TString& name, + const TSigner::TPtr& signer); TString MakeCreateExternalDataSourceQuery( const FederatedQuery::ConnectionContent& connectionContent, const TString& objectStorageEndpoint, const TSigner::TPtr& signer); -TString MakeDeleteExternalDataSourceQuery( - const FederatedQuery::ConnectionContent& connectionContent, - const TSigner::TPtr& signer); +TString MakeDeleteExternalDataSourceQuery(const TString& sourceName); + +TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, + const TString& connectionName); TString MakeDeleteExternalDataTableQuery(const TString& tableName); diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index 8352a5b820c..583c0285754 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -166,10 +166,11 @@ public: hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle); ) - void FinishSuccessfully() { + void FinishSuccessfully(bool isAlreadyExistSuccessStatus) { CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId()); Request->Get()->ComputeYDBOperationWasPerformed = true; + Request->Get()->ComputeYDBIsAlreadyExistFlag = isAlreadyExistSuccessStatus; TBase::SendRequestToSender(); } @@ -188,13 +189,17 @@ public: void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { const auto& executeSchemeQueryStatus = event->Get()->Result; auto isRollback = event->Get()->Rollback; - auto successExecutionRunMode = IsSuccess(executeSchemeQueryStatus) && !isRollback; - auto successExecutionRollbackMode = IsSuccess(executeSchemeQueryStatus) && isRollback; - auto failedExecutionRunMode = !IsSuccess(executeSchemeQueryStatus) && !isRollback; + auto isAlreadyExistSuccessStatus = + IsAlreadyExistSuccessStatus(executeSchemeQueryStatus); + auto isExecuteStatementSuccessful = + executeSchemeQueryStatus.IsSuccess() || isAlreadyExistSuccessStatus; + auto successExecutionRunMode = isExecuteStatementSuccessful && !isRollback; + auto successExecutionRollbackMode = isExecuteStatementSuccessful && isRollback; + auto failedExecutionRunMode = !isExecuteStatementSuccessful && !isRollback; if (successExecutionRunMode) { if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) { - FinishSuccessfully(); + FinishSuccessfully(isAlreadyExistSuccessStatus); return; } } else if (successExecutionRollbackMode) { @@ -236,8 +241,10 @@ public: } private: - bool IsSuccess(const TStatus& status) const { - return status.IsSuccess() || (SuccessOnAlreadyExists && (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || status.GetIssues().ToOneLineString().Contains("error: path exist"))); + bool IsAlreadyExistSuccessStatus(const TStatus& status) const { + return SuccessOnAlreadyExists && + (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || + status.GetIssues().ToOneLineString().Contains("error: path exist")); } private: @@ -258,10 +265,28 @@ NActors::IActor* MakeCreateConnectionActor( auto queryFactoryMethod = [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), signer = std::move(signer)]( - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString { - return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(), - objectStorageEndpoint, - signer); + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { + auto& connectionContent = request->Get()->Request.content(); + + auto createSecretStatement = + CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer); + + std::vector<TSchemaQueryTask> statements; + if (createSecretStatement) { + statements.push_back( + TSchemaQueryTask{.SQL = *createSecretStatement, + .RollbackSQL = DropSecretObjectQuery( + connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer)}); + } + statements.push_back( + TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( + connectionContent, objectStorageEndpoint, signer)}}); + return statements; }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { @@ -298,58 +323,86 @@ NActors::IActor* MakeModifyConnectionActor( using namespace fmt::literals; auto& oldConnectionContent = (*request->Get()->OldConnectionContent); + auto& oldBindings = request->Get()->OldBindingContents; auto& newConnectionContent = request->Get()->Request.content(); - auto deleteOldEntities = fmt::format( - R"( - {delete_external_data_tables}; - {delete_external_data_source}; - )", - "delete_external_data_tables"_a = - JoinMapRange("\n", - request->Get()->OldBindingContents.begin(), - request->Get()->OldBindingContents.end(), - [](const FederatedQuery::BindingContent& binding) { - return MakeDeleteExternalDataTableQuery(binding.name()); - }), - "delete_external_data_source"_a = - MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer)); - - auto createOldEntities = fmt::format( - R"( - {create_external_data_source}; - {create_external_data_tables}; - )", - "create_external_data_source"_a = MakeCreateExternalDataSourceQuery( - oldConnectionContent, objectStorageEndpoint, signer), - "create_external_data_tables"_a = JoinMapRange( - "\n", - request->Get()->OldBindingContents.begin(), - request->Get()->OldBindingContents.end(), - [&oldConnectionContent](const FederatedQuery::BindingContent& binding) { - return MakeCreateExternalDataTableQuery(binding, - oldConnectionContent.name()); - })); - - auto createNewEntities = fmt::format( - R"( - {create_external_data_source}; - {create_external_data_tables}; - )", - "create_external_data_source"_a = MakeCreateExternalDataSourceQuery( - newConnectionContent, objectStorageEndpoint, signer), - "create_external_data_tables"_a = JoinMapRange( - "\n", - request->Get()->OldBindingContents.begin(), - request->Get()->OldBindingContents.end(), - [&newConnectionContent](const FederatedQuery::BindingContent& binding) { - return MakeCreateExternalDataTableQuery(binding, - newConnectionContent.name()); - })); - - return {TSchemaQueryTask{.SQL = TString{deleteOldEntities}, - .RollbackSQL = TString{createOldEntities}}, - TSchemaQueryTask{.SQL = TString{createNewEntities}}}; + auto dropOldSecret = + DropSecretObjectQuery(oldConnectionContent.setting().object_storage().auth(), + oldConnectionContent.name(), + signer); + auto createNewSecret = + CreateSecretObjectQuery(newConnectionContent.setting().object_storage().auth(), + newConnectionContent.name(), + signer); + std::vector<TSchemaQueryTask> statements; + + if (!oldBindings.empty()) { + statements.push_back(TSchemaQueryTask{ + .SQL = JoinMapRange("\n", + oldBindings.begin(), + oldBindings.end(), + [](const FederatedQuery::BindingContent& binding) { + return MakeDeleteExternalDataTableQuery( + binding.name()); + }), + .RollbackSQL = JoinMapRange( + "\n", + oldBindings.begin(), + oldBindings.end(), + [&oldConnectionContent](const FederatedQuery::BindingContent& binding) { + return MakeCreateExternalDataTableQuery(binding, + oldConnectionContent.name()); + })}); + }; + + statements.push_back(TSchemaQueryTask{ + .SQL = TString{MakeDeleteExternalDataSourceQuery(oldConnectionContent.name())}, + .RollbackSQL = TString{MakeCreateExternalDataSourceQuery( + oldConnectionContent, objectStorageEndpoint, signer)}}); + + if (dropOldSecret) { + statements.push_back( + TSchemaQueryTask{.SQL = *dropOldSecret, + .RollbackSQL = CreateSecretObjectQuery( + oldConnectionContent.setting().object_storage().auth(), + oldConnectionContent.name(), + signer)}); + } + if (createNewSecret) { + statements.push_back( + TSchemaQueryTask{.SQL = *createNewSecret, + .RollbackSQL = DropSecretObjectQuery( + newConnectionContent.setting().object_storage().auth(), + newConnectionContent.name(), + signer)}); + } + + statements.push_back( + TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( + newConnectionContent, objectStorageEndpoint, signer)}, + .RollbackSQL = TString{MakeDeleteExternalDataSourceQuery( + newConnectionContent.name())}}); + + if (!oldBindings.empty()) { + statements.push_back(TSchemaQueryTask{ + .SQL = JoinMapRange("\n", + oldBindings.begin(), + oldBindings.end(), + [&newConnectionContent]( + const FederatedQuery::BindingContent& binding) { + return MakeCreateExternalDataTableQuery( + binding, newConnectionContent.name()); + }), + .RollbackSQL = + JoinMapRange("\n", + request->Get()->OldBindingContents.begin(), + request->Get()->OldBindingContents.end(), + [](const FederatedQuery::BindingContent& binding) { + return MakeDeleteExternalDataTableQuery(binding.name()); + })}); + }; + + return statements; }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { @@ -372,20 +425,39 @@ NActors::IActor* MakeDeleteConnectionActor( TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer) { auto queryFactoryMethod = - [signer = std::move(signer)]( - const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString { - return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent, - signer); + [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), + signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { + auto& connectionContent = *request->Get()->ConnectionContent; + + auto dropSecret = + DropSecretObjectQuery(connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer); + + std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{ + .SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())}, + .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent, + objectStorageEndpoint, + signer)}}; + if (dropSecret) { + statements.push_back( + TSchemaQueryTask{.SQL = *dropSecret, + .RollbackSQL = CreateSecretObjectQuery( + connectionContent.setting().object_storage().auth(), + connectionContent.name(), + signer)}); + } + return statements; }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { - return "External data source with such name already exists"; - } else { - return "Couldn't delete external data source in YDB"; - } + Y_UNUSED(queryStatus); + return "Couldn't delete external data source in YDB"; }; return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest, @@ -398,6 +470,39 @@ NActors::IActor* MakeDeleteConnectionActor( errorMessageFactoryMethod); } +NActors::IActor* MakeDropCreateConnectionActor( + const TActorId& proxyActorId, + TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const NConfig::TCommonConfig& commonConfig, + TSigner::TPtr signer) { + auto queryFactoryMethod = + [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), + signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { + auto& connectionContent = request->Get()->Request.content(); + return {TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery( + connectionContent.name())}}, + TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( + connectionContent, objectStorageEndpoint, signer)}}}; + }; + auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { + Y_UNUSED(queryStatus); + return "Couldn't recreate external source in YDB"; + }; + + return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest, + TEvControlPlaneProxy::TEvCreateConnectionResponse>( + proxyActorId, + std::move(request), + requestTimeout, + counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), + queryFactoryMethod, + errorMessageFactoryMethod); +} + /// Bindings actors NActors::IActor* MakeCreateBindingActor( const TActorId& proxyActorId, @@ -477,11 +582,8 @@ NActors::IActor* MakeDeleteBindingActor( }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { - return "External data source with such name already exists"; - } else { - return "Couldn't delete external data source in YDB"; - } + Y_UNUSED(queryStatus); + return "Couldn't delete external data source in YDB"; }; return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteBindingRequest, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h index 36b7a06370b..0df17eca66d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -10,11 +10,9 @@ namespace NFq { namespace NPrivate { -using namespace NActors; - /// Connection manipulation actors NActors::IActor* MakeCreateConnectionActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, @@ -23,7 +21,7 @@ NActors::IActor* MakeCreateConnectionActor( bool successOnAlreadyExists = false); NActors::IActor* MakeModifyConnectionActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, @@ -31,28 +29,37 @@ NActors::IActor* MakeModifyConnectionActor( TSigner::TPtr signer); NActors::IActor* MakeDeleteConnectionActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, + const NConfig::TCommonConfig& commonConfig, + TSigner::TPtr signer); + +NActors::IActor* MakeDropCreateConnectionActor( + const NActors::TActorId& proxyActorId, + TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer); /// Binding manipulation actors NActors::IActor* MakeCreateBindingActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, TDuration requestTimeout, TCounters& counters, bool successOnAlreadyExists = false); NActors::IActor* MakeModifyBindingActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request, TDuration requestTimeout, TCounters& counters); NActors::IActor* MakeDeleteBindingActor( - const TActorId& proxyActorId, + const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request, TDuration requestTimeout, TCounters& counters); diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index c2db1d6771e..3022997a06a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -1254,7 +1254,6 @@ private: CPP_LOG_T("CreateConnectionRequest: " << request.DebugString()); const TString cloudId = ev->Get()->CloudId; const TString subjectType = ev->Get()->SubjectType; - const bool ydbOperationWasPerformed = ev->Get()->ComputeYDBOperationWasPerformed; const TString scope = ev->Get()->Scope; TString user = ev->Get()->User; TString token = ev->Get()->Token; @@ -1310,7 +1309,10 @@ private: return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->RequestValidationPassed) { + const auto isYDBOperationEnabled = + Config.ComputeConfig.IsYDBSchemaOperationsEnabled( + ev->Get()->Scope, ev->Get()->Request.content().setting().connection_case()); + if (isYDBOperationEnabled && !ev->Get()->RequestValidationPassed) { auto requestValidationIssues = ::NFq::ValidateConnection(ev, Config.StorageConfig.Proto.GetMaxRequestSize(), @@ -1340,7 +1342,7 @@ private: TPermissions::TPermission::MANAGE_PUBLIC }; - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ydbOperationWasPerformed) { + if (isYDBOperationEnabled) { if (!ev->Get()->YDBClient) { ev->Get()->YDBClient = CreateNewTableClient(ev, Config.ComputeConfig, @@ -1348,14 +1350,55 @@ private: CredentialsProviderFactory); } - Register(NPrivate::MakeCreateConnectionActor( - ControlPlaneProxyActorId(), - std::move(ev), - Config.RequestTimeout, - Counters, - Config.CommonConfig, - Signer)); - return; + if (!ev->Get()->ComputeYDBOperationWasPerformed && !ev->Get()->ComputeYDBIsAlreadyExistFlag) { + Register(NPrivate::MakeCreateConnectionActor(ControlPlaneProxyActorId(), + std::move(ev), + Config.RequestTimeout, + Counters, + Config.CommonConfig, + Signer, + true)); + return; + } + + if (ev->Get()->ComputeYDBIsAlreadyExistFlag && !ev->Get()->CPSListingFinished) { + ev->Get()->ComputeYDBOperationWasPerformed = false; + Register(MakeListConnectionActor(ControlPlaneProxyActorId(), + std::move(ev), + Counters, + Config.RequestTimeout, + availablePermissions)); + return; + } + + if (ev->Get()->ComputeYDBIsAlreadyExistFlag && + !ev->Get()->ComputeYDBOperationWasPerformed) { + if (ev->Get()->CPSConnectionCount != 0) { + CPS_LOG_E("CreateConnectionRequest, Connection with such name already exists: " + << scope << " " << user << " " << NKikimr::MaskTicket(token) + << " " << request.DebugString()); + Send(ev->Sender, + new TEvControlPlaneProxy::TEvCreateConnectionResponse( + NYql::TIssues{ + NYql::TIssue{"Connection with such name already exists"}}, + subjectType), + 0, + ev->Cookie); + requestCounters.IncError(); + TDuration delta = TInstant::Now() - startTime; + requestCounters.Common->LatencyMs->Collect(delta.MilliSeconds()); + probe(delta, false, false); + return; + } + + Register(NPrivate::MakeDropCreateConnectionActor(ControlPlaneProxyActorId(), + std::move(ev), + Config.RequestTimeout, + Counters, + Config.CommonConfig, + Signer)); + return; + } } Register(new TRequestActor<FederatedQuery::CreateConnectionRequest, @@ -1564,8 +1607,10 @@ private: return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && - !ev->Get()->RequestValidationPassed) { + const auto isYDBOperationEnabled = Config.ComputeConfig.IsYDBSchemaOperationsEnabled( + ev->Get()->Scope, ev->Get()->Request.content().setting().connection_case()); + + if (isYDBOperationEnabled && !ev->Get()->RequestValidationPassed) { auto requestValidationIssues = ::NFq::ValidateConnection(ev, Config.StorageConfig.Proto.GetMaxRequestSize(), @@ -1598,28 +1643,25 @@ private: | TPermissions::TPermission::VIEW_PRIVATE }; - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->OldConnectionContent) { + if (isYDBOperationEnabled && !ev->Get()->OldConnectionContent) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBConnectionName( + Register(MakeDiscoverYDBConnectionNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->OldBindingNamesDiscoveryFinished) { + if (isYDBOperationEnabled && !ev->Get()->OldBindingNamesDiscoveryFinished) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeListBindingIds( + Register(MakeListBindingIdsActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) { + if (isYDBOperationEnabled && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDescribeListedBinding( + Register(MakeDescribeListedBindingActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } - const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; - if (!controlPlaneYDBOperationWasPerformed) { - auto shouldReplyWithResponse = - !Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope); + if (!ev->Get()->ControlPlaneYDBOperationWasPerformed) { Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest, TEvControlPlaneStorage::TEvModifyConnectionRequest, TEvControlPlaneStorage::TEvModifyConnectionResponse, @@ -1631,11 +1673,11 @@ private: requestCounters, probe, availablePermissions, - shouldReplyWithResponse)); + !isYDBOperationEnabled)); return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) { + if (isYDBOperationEnabled) { if (!ev->Get()->YDBClient) { ev->Get()->YDBClient = CreateNewTableClient(ev, Config.ComputeConfig, @@ -1725,15 +1767,19 @@ private: if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->ConnectionContent) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBConnectionName( + Register(MakeDiscoverYDBConnectionNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } - const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; - if (!controlPlaneYDBOperationWasPerformed) { - auto shouldReplyWithResponse = - !Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope); + const auto isYDBOperationEnabled = + ev->Get()->ConnectionContent + ? Config.ComputeConfig.IsYDBSchemaOperationsEnabled( + ev->Get()->Scope, + ev->Get()->ConnectionContent->setting().connection_case()) + : false; + + if (!ev->Get()->ControlPlaneYDBOperationWasPerformed) { Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest, TEvControlPlaneStorage::TEvDeleteConnectionRequest, TEvControlPlaneStorage::TEvDeleteConnectionResponse, @@ -1745,11 +1791,11 @@ private: requestCounters, probe, availablePermissions, - shouldReplyWithResponse)); + !isYDBOperationEnabled)); return; } - if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) { + if (isYDBOperationEnabled) { if (!ev->Get()->YDBClient) { ev->Get()->YDBClient = CreateNewTableClient(ev, Config.ComputeConfig, @@ -1758,8 +1804,12 @@ private: } if (!ev->Get()->ComputeYDBOperationWasPerformed) { - Register(MakeDeleteConnectionActor( - ControlPlaneProxyActorId(), ev, Config.RequestTimeout, Counters, Signer)); + Register(MakeDeleteConnectionActor(ControlPlaneProxyActorId(), + ev, + Config.RequestTimeout, + Counters, + Config.CommonConfig, + Signer)); return; } @@ -1923,7 +1973,7 @@ private: if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ydbOperationWasPerformed) { if (!ev->Get()->ConnectionName) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBConnectionName( + Register(MakeDiscoverYDBConnectionNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } @@ -2177,13 +2227,13 @@ private: if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) { if (!ev->Get()->OldBindingContent) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBBindingName( + Register(MakeDiscoverYDBBindingNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } if (!ev->Get()->ConnectionName) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBConnectionName( + Register(MakeDiscoverYDBConnectionNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } @@ -2296,7 +2346,7 @@ private: if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->OldBindingName) { auto permissions = ExtractPermissions(ev, availablePermissions); - Register(MakeDiscoverYDBBindingName( + Register(MakeDiscoverYDBBindingNameActor( ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions)); return; } diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index 28d61bf2ba1..ebd103ae226 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -301,6 +301,25 @@ struct TEvControlPlaneProxy { }; template<> + struct TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, + EvCreateConnectionRequest> : + public TBaseControlPlaneRequest<TEvCreateConnectionRequest, + FederatedQuery::CreateConnectionRequest, + EvCreateConnectionRequest> { + using TBaseControlPlaneRequest< + TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>, + FederatedQuery::CreateConnectionRequest, + EvCreateConnectionRequest>::TBaseControlPlaneRequest; + + bool ComputeYDBIsAlreadyExistFlag = false; + // Check that connection does not exist in CPS + bool CPSListingFinished = false; + size_t CPSConnectionCount = 0; + // ?? + bool IsInDeleteCreateState = false; + }; + + template<> struct TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest> : public TBaseControlPlaneRequest<TEvModifyConnectionRequest, FederatedQuery::ModifyConnectionRequest, @@ -310,6 +329,7 @@ struct TEvControlPlaneProxy { FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest>::TBaseControlPlaneRequest; + bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent; // ListBindings bool OldBindingNamesDiscoveryFinished = false; @@ -329,6 +349,7 @@ struct TEvControlPlaneProxy { FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest>::TBaseControlPlaneRequest; + bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::ConnectionContent> ConnectionContent; }; @@ -342,6 +363,7 @@ struct TEvControlPlaneProxy { FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>::TBaseControlPlaneRequest; + bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<TString> ConnectionName; }; @@ -355,6 +377,7 @@ struct TEvControlPlaneProxy { FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest>::TBaseControlPlaneRequest; + bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::BindingContent> OldBindingContent; TMaybe<TString> ConnectionName; }; @@ -369,6 +392,7 @@ struct TEvControlPlaneProxy { FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>::TBaseControlPlaneRequest; + bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<TString> OldBindingName; }; }; diff --git a/ydb/core/fq/libs/control_plane_storage/events/events.h b/ydb/core/fq/libs/control_plane_storage/events/events.h index bc47f475585..5ef1b4bc5dd 100644 --- a/ydb/core/fq/libs/control_plane_storage/events/events.h +++ b/ydb/core/fq/libs/control_plane_storage/events/events.h @@ -175,19 +175,20 @@ struct TEvControlPlaneStorage { static_assert(EvEnd <= YqEventSubspaceEnd(NFq::TYqEventSubspace::ControlPlaneStorage), "All events must be in their subspace"); - template<typename ProtoMessage, ui32 EventType> - struct TControlPlaneRequest : NActors::TEventLocal<TControlPlaneRequest<ProtoMessage, EventType>, EventType> { + template<typename TDerived, typename ProtoMessage, ui32 EventType> + struct TBaseControlPlaneRequest : NActors::TEventLocal<TDerived, EventType> { using TProto = ProtoMessage; - explicit TControlPlaneRequest(const TString& scope, - const ProtoMessage& request, - const TString& user, - const TString& token, - const TString& cloudId, - TPermissions permissions, - TMaybe<TQuotaMap> quotas, - TTenantInfo::TPtr tenantInfo, - const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase) + explicit TBaseControlPlaneRequest( + const TString& scope, + const ProtoMessage& request, + const TString& user, + const TString& token, + const TString& cloudId, + TPermissions permissions, + TMaybe<TQuotaMap> quotas, + TTenantInfo::TPtr tenantInfo, + const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase) : Scope(scope) , Request(request) , User(user) @@ -196,9 +197,7 @@ struct TEvControlPlaneStorage { , Permissions(permissions) , Quotas(std::move(quotas)) , TenantInfo(tenantInfo) - , ComputeDatabase(computeDatabase) - { - } + , ComputeDatabase(computeDatabase) { } size_t GetByteSize() const { return sizeof(*this) @@ -220,6 +219,50 @@ struct TEvControlPlaneStorage { FederatedQuery::Internal::ComputeDatabaseInternal ComputeDatabase; }; + template<typename TProtoMessage, ui32 EventType> + struct TControlPlaneRequest : + TBaseControlPlaneRequest<TControlPlaneRequest<TProtoMessage, EventType>, + TProtoMessage, + EventType> { + using TBaseControlPlaneRequest<TControlPlaneRequest<TProtoMessage, EventType>, + TProtoMessage, + EventType>::TBaseControlPlaneRequest; + }; + + template<typename ProtoMessage, ui32 EventType> + struct TControlPlaneListRequest : + public TBaseControlPlaneRequest<TControlPlaneListRequest<ProtoMessage, EventType>, + ProtoMessage, + EventType> { + using TProto = ProtoMessage; + + explicit TControlPlaneListRequest( + const TString& scope, + const ProtoMessage& request, + const TString& user, + const TString& token, + const TString& cloudId, + TPermissions permissions, + TMaybe<TQuotaMap> quotas, + TTenantInfo::TPtr tenantInfo, + const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase, + bool isExactNameMatch = false) + : TBaseControlPlaneRequest<TControlPlaneListRequest<ProtoMessage, EventType>, + ProtoMessage, + EventType>(scope, + request, + user, + token, + cloudId, + permissions, + std::move(quotas), + tenantInfo, + computeDatabase) + , IsExactNameMatch(isExactNameMatch) { } + + bool IsExactNameMatch = false; + }; + template<typename TDerived, typename ProtoMessage, ui32 EventType> struct TControlPlaneResponse : NActors::TEventLocal<TDerived, EventType> { using TProto = ProtoMessage; @@ -326,7 +369,7 @@ struct TEvControlPlaneStorage { using TEvDescribeJobResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeJobResult, EvDescribeJobResponse>; using TEvCreateConnectionRequest = TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>; using TEvCreateConnectionResponse = TControlPlaneAuditableResponse<FederatedQuery::CreateConnectionResult, FederatedQuery::Connection, EvCreateConnectionResponse>; - using TEvListConnectionsRequest = TControlPlaneRequest<FederatedQuery::ListConnectionsRequest, EvListConnectionsRequest>; + using TEvListConnectionsRequest = TControlPlaneListRequest<FederatedQuery::ListConnectionsRequest, EvListConnectionsRequest>; using TEvListConnectionsResponse = TControlPlaneNonAuditableResponse<FederatedQuery::ListConnectionsResult, EvListConnectionsResponse>; using TEvDescribeConnectionRequest = TControlPlaneRequest<FederatedQuery::DescribeConnectionRequest, EvDescribeConnectionRequest>; using TEvDescribeConnectionResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeConnectionResult, EvDescribeConnectionResponse>; @@ -336,7 +379,7 @@ struct TEvControlPlaneStorage { using TEvDeleteConnectionResponse = TControlPlaneAuditableResponse<FederatedQuery::DeleteConnectionResult, FederatedQuery::Connection, EvDeleteConnectionResponse>; using TEvCreateBindingRequest = TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>; using TEvCreateBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::CreateBindingResult, FederatedQuery::Binding, EvCreateBindingResponse>; - using TEvListBindingsRequest = TControlPlaneRequest<FederatedQuery::ListBindingsRequest, EvListBindingsRequest>; + using TEvListBindingsRequest = TControlPlaneListRequest<FederatedQuery::ListBindingsRequest, EvListBindingsRequest>; using TEvListBindingsResponse = TControlPlaneNonAuditableResponse<FederatedQuery::ListBindingsResult, EvListBindingsResponse>; using TEvDescribeBindingRequest = TControlPlaneRequest<FederatedQuery::DescribeBindingRequest, EvDescribeBindingRequest>; using TEvDescribeBindingResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeBindingResult, EvDescribeBindingResponse>; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp index 3e134579bd0..ddeaf865d23 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp @@ -214,7 +214,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding if (request.filter().name()) { queryBuilder.AddString("filter_name", request.filter().name()); - filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'"); + if (event.IsExactNameMatch) { + filters.push_back("`" NAME_COLUMN_NAME "` = '$filter_name'"); + } else { + filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'"); + } } if (request.filter().created_by_me()) { diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp index 8edb3dc7010..412b5b28ea1 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp @@ -204,7 +204,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect TVector<TString> filters; if (request.filter().name()) { queryBuilder.AddString("filter_name", request.filter().name()); - filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'"); + if (event.IsExactNameMatch) { + filters.push_back("`" NAME_COLUMN_NAME "` = '$filter_name'"); + } else { + filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'"); + } } if (request.filter().created_by_me()) { |