aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-08-22 19:01:00 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-08-22 19:22:23 +0300
commita42171b3cf1fa15a4276d31465f2b8621c9fbc86 (patch)
tree2ef8e27db4a81640c610bdd77cdc64034a07a945
parent769d14120ef8e30363c7dd6870ce1b82552587c3 (diff)
downloadydb-a42171b3cf1fa15a4276d31465f2b8621c9fbc86.tar.gz
Green yqv2-dev SLI
Initial implementation
-rw-r--r--ydb/core/fq/libs/compute/common/config.h23
-rw-r--r--ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp12
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp115
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h23
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp39
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h16
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp252
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h23
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp126
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h24
-rw-r--r--ydb/core/fq/libs/control_plane_storage/events/events.h75
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp6
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp6
13 files changed, 519 insertions, 221 deletions
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
index a332189cb21..b0616b1bff0 100644
--- a/ydb/core/fq/libs/compute/common/config.h
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -5,7 +5,6 @@
#include <util/digest/multi.h>
#include <util/generic/algorithm.h>
-#include <util/generic/algorithm.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
@@ -103,10 +102,32 @@ public:
NFq::NConfig::EComputeType::YDB);
}
+ bool IsYDBSchemaOperationsEnabled(
+ const TString& scope,
+ const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const {
+ return IsConnectionCaseEnabled(connectionCase) &&
+ YdbComputeControlPlaneEnabled(scope);
+ }
+
const NFq::NConfig::TComputeConfig& GetProto() const {
return ComputeConfig;
}
+ bool IsConnectionCaseEnabled(
+ const FederatedQuery::ConnectionSetting::ConnectionCase& connectionCase) const {
+ switch (connectionCase) {
+ case FederatedQuery::ConnectionSetting::kObjectStorage:
+ return true;
+ case FederatedQuery::ConnectionSetting::kYdbDatabase:
+ case FederatedQuery::ConnectionSetting::kClickhouseCluster:
+ case FederatedQuery::ConnectionSetting::kDataStreams:
+ case FederatedQuery::ConnectionSetting::kMonitoring:
+ case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
+ case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+ return false;
+ }
+ }
+
private:
NFq::NConfig::TComputeConfig ComputeConfig;
NFq::NConfig::EComputeType DefaultCompute;
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
index 3480c506d9b..be2c5c3ea43 100644
--- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
@@ -313,18 +313,10 @@ private:
const auto& meta = connection.meta();
const auto& content = connection.content();
const auto& setting = content.setting();
- switch (setting.connection_case()) {
- case FederatedQuery::ConnectionSetting::kObjectStorage:
- break;
- case FederatedQuery::ConnectionSetting::kYdbDatabase:
- case FederatedQuery::ConnectionSetting::kClickhouseCluster:
- case FederatedQuery::ConnectionSetting::kDataStreams:
- case FederatedQuery::ConnectionSetting::kMonitoring:
- case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
- case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
+
+ if (!ComputeConfig.IsConnectionCaseEnabled(setting.connection_case())) {
LOG_I("Exclude connection by type: scope = " << Scope << " , id = " << meta.id() << ", type = " << static_cast<int>(setting.connection_case()));
excludeIds.push_back(meta.id());
- break;
}
switch (content.acl().visibility()) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
index 69adc07df2a..d9666ad965c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
@@ -44,28 +44,37 @@ private:
using TEventRequestPtr = typename TEventRequest::TPtr;
public:
- using TCPSRequestFactory =
+ using TCPSProtoRequestFactory =
std::function<typename TCPSEventRequest::TProto(const TEventRequestPtr& request)>;
- using TErrorMessageFactoryMethod = std::function<TString(const NYql::TIssues& issues)>;
- using TEntityNameExtractorFactoryMethod =
+ using TErrorMessageFactory = std::function<TString(const NYql::TIssues& issues)>;
+ using TCPSEventRequestPostProcessor = std::function<void(TCPSEventRequest& eventRequest)>;
+ using TResultHandler =
std::function<void(const TEventRequestPtr& request,
const typename TCPSEventResponse::TProto& response)>;
-
- TControlPlaneStorageRequesterActor(const TActorId& proxyActorId,
- const TEventRequestPtr request,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters,
- TPermissions permissions,
- TCPSRequestFactory cpsRequestFactory,
- TErrorMessageFactoryMethod errorMessageFactoryMethod,
- TEntityNameExtractorFactoryMethod entityNameExtractorFactoryMethod)
- : TBaseActor<
- TControlPlaneStorageRequesterActor<TEventRequest, TEventResponse, TCPSEventRequest, TCPSEventResponse>>(
+ using TIssuesHandler = std::function<void(const TEventRequestPtr& request,
+ const typename NYql::TIssues& issues)>;
+
+ TControlPlaneStorageRequesterActor(
+ const TActorId& proxyActorId,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ TPermissions permissions,
+ TCPSProtoRequestFactory cpsProtoRequestFactory,
+ TErrorMessageFactory errorMessageFactory,
+ TResultHandler resultHandler,
+ TCPSEventRequestPostProcessor cpsEventRequestPostProcessor =
+ std::function([](TCPSEventRequest& a) { Y_UNUSED(a); }))
+ : TBaseActor<TControlPlaneStorageRequesterActor<TEventRequest,
+ TEventResponse,
+ TCPSEventRequest,
+ TCPSEventResponse>>(
proxyActorId, std::move(request), requestTimeout, counters)
, Permissions(permissions)
- , CPSRequestFactory(cpsRequestFactory)
- , ErrorMessageFactoryMethod(errorMessageFactoryMethod)
- , EntityNameExtractorFactoryMethod(entityNameExtractorFactoryMethod) { }
+ , CPSProtoRequestFactory(cpsProtoRequestFactory)
+ , CPSEventRequestPostProcessor(cpsEventRequestPostProcessor)
+ , ErrorMessageFactory(errorMessageFactory)
+ , ResultHandler(resultHandler) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_CONTROL_PLANE_STORAGE";
@@ -77,7 +86,7 @@ public:
CPP_LOG_I("TControlPlaneStorageRequesterActor Sending CPS request. Actor id: " << TBase::SelfId());
const auto& request = Request;
auto event = new TCPSEventRequest(request->Get()->Scope,
- CPSRequestFactory(request),
+ CPSProtoRequestFactory(request),
request->Get()->User,
request->Get()->Token,
request->Get()->CloudId,
@@ -85,6 +94,7 @@ public:
request->Get()->Quotas,
request->Get()->TenantInfo,
{});
+ CPSEventRequestPostProcessor(*event);
Send(ControlPlaneStorageServiceActorId(), event);
}
@@ -98,21 +108,22 @@ public:
auto issues = event->Get()->Issues;
if (!issues.Empty()) {
CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished with issues. Actor id: " << TBase::SelfId());
- TString errorMessage = ErrorMessageFactoryMethod(issues);
+ TString errorMessage = ErrorMessageFactory(issues);
TBase::HandleError(errorMessage, issues);
return;
}
CPP_LOG_I("TControlPlaneStorageRequesterActor Handling CPS response. Request finished successfully. Actor id: " << TBase::SelfId());
- EntityNameExtractorFactoryMethod(Request, event->Get()->Result);
+ ResultHandler(Request, event->Get()->Result);
TBase::SendRequestToSender();
}
private:
TPermissions Permissions;
- TCPSRequestFactory CPSRequestFactory;
- TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
- TEntityNameExtractorFactoryMethod EntityNameExtractorFactoryMethod;
+ TCPSProtoRequestFactory CPSProtoRequestFactory;
+ TCPSEventRequestPostProcessor CPSEventRequestPostProcessor;
+ TErrorMessageFactory ErrorMessageFactory;
+ TResultHandler ResultHandler;
};
/// Discover connection_name
@@ -120,7 +131,49 @@ TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues)
Y_UNUSED(issues);
return "Couldn't resolve connection";
};
-NActors::IActor* MakeDiscoverYDBConnectionName(
+
+NActors::IActor* MakeListConnectionActor(
+ const TActorId& proxyActorId,
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event) {
+ FederatedQuery::ListConnectionsRequest result;
+ auto connectionName = event->Get()->Request.content().name();
+ result.mutable_filter()->set_name(connectionName);
+ result.set_limit(2);
+ return result;
+ };
+ auto cpsRequestPostProcessor =
+ [](TEvControlPlaneStorage::TEvListConnectionsRequest& event) {
+ event.IsExactNameMatch = true;
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event,
+ const FederatedQuery::ListConnectionsResult& result) {
+ event->Get()->CPSListingFinished = true;
+ event->Get()->CPSConnectionCount = result.connection_size();
+ };
+
+ return new TControlPlaneStorageRequesterActor<
+ TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse,
+ TEvControlPlaneStorage::TEvListConnectionsRequest,
+ TEvControlPlaneStorage::TEvListConnectionsResponse>(
+ proxyActorId,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ DescribeConnectionErrorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod,
+ cpsRequestPostProcessor);
+}
+
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
TCounters& counters,
@@ -153,7 +206,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
@@ -186,7 +239,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request,
TCounters& counters,
@@ -219,7 +272,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
TCounters& counters,
@@ -254,7 +307,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName(
/// Discover binding_name
-NActors::IActor* MakeDiscoverYDBBindingName(
+NActors::IActor* MakeDiscoverYDBBindingNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
TCounters& counters,
@@ -291,7 +344,7 @@ NActors::IActor* MakeDiscoverYDBBindingName(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeDiscoverYDBBindingName(
+NActors::IActor* MakeDiscoverYDBBindingNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request,
TCounters& counters,
@@ -328,7 +381,7 @@ NActors::IActor* MakeDiscoverYDBBindingName(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeListBindingIds(
+NActors::IActor* MakeListBindingIdsActor(
const TActorId proxyActorId,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
@@ -381,7 +434,7 @@ NActors::IActor* MakeListBindingIds(
entityNameExtractorFactoryMethod);
}
-NActors::IActor* MakeDescribeListedBinding(
+NActors::IActor* MakeDescribeListedBindingActor(
const TActorId proxyActorId,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
index 38b6557e141..60c89e9f95c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
@@ -11,44 +11,51 @@ namespace NPrivate {
using namespace NActors;
/// Discover connection_name
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeDiscoverYDBConnectionName(
+NActors::IActor* MakeDiscoverYDBConnectionNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
+NActors::IActor* MakeListConnectionActor(
+ const TActorId& proxyActorId,
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
/// Discover binding_name
-NActors::IActor* MakeDiscoverYDBBindingName(
+NActors::IActor* MakeDiscoverYDBBindingNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeDiscoverYDBBindingName(
+NActors::IActor* MakeDiscoverYDBBindingNameActor(
const TActorId& proxyActorId,
const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request,
TCounters& counters,
@@ -57,14 +64,14 @@ NActors::IActor* MakeDiscoverYDBBindingName(
/// ModifyConnection
-NActors::IActor* MakeListBindingIds(
+NActors::IActor* MakeListBindingIdsActor(
const TActorId sender,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
-NActors::IActor* MakeDescribeListedBinding(
+NActors::IActor* MakeDescribeListedBindingActor(
const TActorId sender,
const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
TCounters& counters,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
index 494e44034aa..c28ea7732a7 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -92,9 +92,9 @@ TString SignAccountId(const TString& id, const TSigner::TPtr& signer) {
return signer ? signer->SignAccountId(id) : TString{};
}
-TString CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
- const TString& name,
- const TSigner::TPtr& signer) {
+TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer) {
using namespace fmt::literals;
switch (auth.identity_case()) {
case FederatedQuery::IamAuth::kServiceAccount: {
@@ -152,7 +152,6 @@ TString MakeCreateExternalDataSourceQuery(
return fmt::format(
R"(
- {upsert_object};
CREATE EXTERNAL DATA SOURCE {external_source} WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}"
@@ -161,19 +160,15 @@ TString MakeCreateExternalDataSourceQuery(
)",
"external_source"_a = EncloseAndEscapeString(sourceName, '`'),
"location"_a = objectStorageEndpoint + "/" + EscapeString(bucketName, '"') + "/",
- "upsert_object"_a =
- CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(),
- connectionContent.name(),
- signer),
"auth_params"_a =
CreateAuthParamsQuery(connectionContent.setting().object_storage().auth(),
connectionContent.name(),
signer));
}
-TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
- const TString& name,
- const TSigner::TPtr& signer) {
+TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer) {
using namespace fmt::literals;
switch (auth.identity_case()) {
case FederatedQuery::IamAuth::kServiceAccount: {
@@ -192,27 +187,17 @@ TString DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
}
}
-TString MakeDeleteExternalDataSourceQuery(
- const FederatedQuery::ConnectionContent& connectionContent,
- const TSigner::TPtr& signer) {
- using namespace fmt::literals;
- return fmt::format(
- R"(
- {drop_secret_statement};
- DROP EXTERNAL DATA SOURCE {external_source};
- )",
- "drop_secret_statement"_a =
- DropSecretObjectQuery(connectionContent.setting().object_storage().auth(),
- connectionContent.name(),
- signer),
- "external_source"_a = EncloseAndEscapeString(connectionContent.name(), '`'));
-}
-
TString MakeDeleteExternalDataTableQuery(const TString& tableName) {
using namespace fmt::literals;
return fmt::format("DROP EXTERNAL TABLE {external_table};",
"external_table"_a = EncloseAndEscapeString(tableName, '`'));
}
+TString MakeDeleteExternalDataSourceQuery(const TString& sourceName) {
+ using namespace fmt::literals;
+ return fmt::format("DROP EXTERNAL DATA SOURCE {external_source};",
+ "external_source"_a = EncloseAndEscapeString(sourceName, '`'));
+}
+
} // namespace NPrivate
} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
index 480ee99ee7f..6c4a3ac44ce 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
@@ -7,17 +7,23 @@
namespace NFq {
namespace NPrivate {
-TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
- const TString& connectionName);
+TMaybe<TString> CreateSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer);
+
+TMaybe<TString> DropSecretObjectQuery(const FederatedQuery::IamAuth& auth,
+ const TString& name,
+ const TSigner::TPtr& signer);
TString MakeCreateExternalDataSourceQuery(
const FederatedQuery::ConnectionContent& connectionContent,
const TString& objectStorageEndpoint,
const TSigner::TPtr& signer);
-TString MakeDeleteExternalDataSourceQuery(
- const FederatedQuery::ConnectionContent& connectionContent,
- const TSigner::TPtr& signer);
+TString MakeDeleteExternalDataSourceQuery(const TString& sourceName);
+
+TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
+ const TString& connectionName);
TString MakeDeleteExternalDataTableQuery(const TString& tableName);
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index 8352a5b820c..583c0285754 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -166,10 +166,11 @@ public:
hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle);
)
- void FinishSuccessfully() {
+ void FinishSuccessfully(bool isAlreadyExistSuccessStatus) {
CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: "
<< TBase::SelfId());
Request->Get()->ComputeYDBOperationWasPerformed = true;
+ Request->Get()->ComputeYDBIsAlreadyExistFlag = isAlreadyExistSuccessStatus;
TBase::SendRequestToSender();
}
@@ -188,13 +189,17 @@ public:
void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
const auto& executeSchemeQueryStatus = event->Get()->Result;
auto isRollback = event->Get()->Rollback;
- auto successExecutionRunMode = IsSuccess(executeSchemeQueryStatus) && !isRollback;
- auto successExecutionRollbackMode = IsSuccess(executeSchemeQueryStatus) && isRollback;
- auto failedExecutionRunMode = !IsSuccess(executeSchemeQueryStatus) && !isRollback;
+ auto isAlreadyExistSuccessStatus =
+ IsAlreadyExistSuccessStatus(executeSchemeQueryStatus);
+ auto isExecuteStatementSuccessful =
+ executeSchemeQueryStatus.IsSuccess() || isAlreadyExistSuccessStatus;
+ auto successExecutionRunMode = isExecuteStatementSuccessful && !isRollback;
+ auto successExecutionRollbackMode = isExecuteStatementSuccessful && isRollback;
+ auto failedExecutionRunMode = !isExecuteStatementSuccessful && !isRollback;
if (successExecutionRunMode) {
if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) {
- FinishSuccessfully();
+ FinishSuccessfully(isAlreadyExistSuccessStatus);
return;
}
} else if (successExecutionRollbackMode) {
@@ -236,8 +241,10 @@ public:
}
private:
- bool IsSuccess(const TStatus& status) const {
- return status.IsSuccess() || (SuccessOnAlreadyExists && (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || status.GetIssues().ToOneLineString().Contains("error: path exist")));
+ bool IsAlreadyExistSuccessStatus(const TStatus& status) const {
+ return SuccessOnAlreadyExists &&
+ (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
+ status.GetIssues().ToOneLineString().Contains("error: path exist"));
}
private:
@@ -258,10 +265,28 @@ NActors::IActor* MakeCreateConnectionActor(
auto queryFactoryMethod =
[objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
signer = std::move(signer)](
- const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString {
- return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(),
- objectStorageEndpoint,
- signer);
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
+ auto& connectionContent = request->Get()->Request.content();
+
+ auto createSecretStatement =
+ CreateSecretObjectQuery(connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer);
+
+ std::vector<TSchemaQueryTask> statements;
+ if (createSecretStatement) {
+ statements.push_back(
+ TSchemaQueryTask{.SQL = *createSecretStatement,
+ .RollbackSQL = DropSecretObjectQuery(
+ connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer)});
+ }
+ statements.push_back(
+ TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
+ connectionContent, objectStorageEndpoint, signer)}});
+ return statements;
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
@@ -298,58 +323,86 @@ NActors::IActor* MakeModifyConnectionActor(
using namespace fmt::literals;
auto& oldConnectionContent = (*request->Get()->OldConnectionContent);
+ auto& oldBindings = request->Get()->OldBindingContents;
auto& newConnectionContent = request->Get()->Request.content();
- auto deleteOldEntities = fmt::format(
- R"(
- {delete_external_data_tables};
- {delete_external_data_source};
- )",
- "delete_external_data_tables"_a =
- JoinMapRange("\n",
- request->Get()->OldBindingContents.begin(),
- request->Get()->OldBindingContents.end(),
- [](const FederatedQuery::BindingContent& binding) {
- return MakeDeleteExternalDataTableQuery(binding.name());
- }),
- "delete_external_data_source"_a =
- MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer));
-
- auto createOldEntities = fmt::format(
- R"(
- {create_external_data_source};
- {create_external_data_tables};
- )",
- "create_external_data_source"_a = MakeCreateExternalDataSourceQuery(
- oldConnectionContent, objectStorageEndpoint, signer),
- "create_external_data_tables"_a = JoinMapRange(
- "\n",
- request->Get()->OldBindingContents.begin(),
- request->Get()->OldBindingContents.end(),
- [&oldConnectionContent](const FederatedQuery::BindingContent& binding) {
- return MakeCreateExternalDataTableQuery(binding,
- oldConnectionContent.name());
- }));
-
- auto createNewEntities = fmt::format(
- R"(
- {create_external_data_source};
- {create_external_data_tables};
- )",
- "create_external_data_source"_a = MakeCreateExternalDataSourceQuery(
- newConnectionContent, objectStorageEndpoint, signer),
- "create_external_data_tables"_a = JoinMapRange(
- "\n",
- request->Get()->OldBindingContents.begin(),
- request->Get()->OldBindingContents.end(),
- [&newConnectionContent](const FederatedQuery::BindingContent& binding) {
- return MakeCreateExternalDataTableQuery(binding,
- newConnectionContent.name());
- }));
-
- return {TSchemaQueryTask{.SQL = TString{deleteOldEntities},
- .RollbackSQL = TString{createOldEntities}},
- TSchemaQueryTask{.SQL = TString{createNewEntities}}};
+ auto dropOldSecret =
+ DropSecretObjectQuery(oldConnectionContent.setting().object_storage().auth(),
+ oldConnectionContent.name(),
+ signer);
+ auto createNewSecret =
+ CreateSecretObjectQuery(newConnectionContent.setting().object_storage().auth(),
+ newConnectionContent.name(),
+ signer);
+ std::vector<TSchemaQueryTask> statements;
+
+ if (!oldBindings.empty()) {
+ statements.push_back(TSchemaQueryTask{
+ .SQL = JoinMapRange("\n",
+ oldBindings.begin(),
+ oldBindings.end(),
+ [](const FederatedQuery::BindingContent& binding) {
+ return MakeDeleteExternalDataTableQuery(
+ binding.name());
+ }),
+ .RollbackSQL = JoinMapRange(
+ "\n",
+ oldBindings.begin(),
+ oldBindings.end(),
+ [&oldConnectionContent](const FederatedQuery::BindingContent& binding) {
+ return MakeCreateExternalDataTableQuery(binding,
+ oldConnectionContent.name());
+ })});
+ };
+
+ statements.push_back(TSchemaQueryTask{
+ .SQL = TString{MakeDeleteExternalDataSourceQuery(oldConnectionContent.name())},
+ .RollbackSQL = TString{MakeCreateExternalDataSourceQuery(
+ oldConnectionContent, objectStorageEndpoint, signer)}});
+
+ if (dropOldSecret) {
+ statements.push_back(
+ TSchemaQueryTask{.SQL = *dropOldSecret,
+ .RollbackSQL = CreateSecretObjectQuery(
+ oldConnectionContent.setting().object_storage().auth(),
+ oldConnectionContent.name(),
+ signer)});
+ }
+ if (createNewSecret) {
+ statements.push_back(
+ TSchemaQueryTask{.SQL = *createNewSecret,
+ .RollbackSQL = DropSecretObjectQuery(
+ newConnectionContent.setting().object_storage().auth(),
+ newConnectionContent.name(),
+ signer)});
+ }
+
+ statements.push_back(
+ TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
+ newConnectionContent, objectStorageEndpoint, signer)},
+ .RollbackSQL = TString{MakeDeleteExternalDataSourceQuery(
+ newConnectionContent.name())}});
+
+ if (!oldBindings.empty()) {
+ statements.push_back(TSchemaQueryTask{
+ .SQL = JoinMapRange("\n",
+ oldBindings.begin(),
+ oldBindings.end(),
+ [&newConnectionContent](
+ const FederatedQuery::BindingContent& binding) {
+ return MakeCreateExternalDataTableQuery(
+ binding, newConnectionContent.name());
+ }),
+ .RollbackSQL =
+ JoinMapRange("\n",
+ request->Get()->OldBindingContents.begin(),
+ request->Get()->OldBindingContents.end(),
+ [](const FederatedQuery::BindingContent& binding) {
+ return MakeDeleteExternalDataTableQuery(binding.name());
+ })});
+ };
+
+ return statements;
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
@@ -372,20 +425,39 @@ NActors::IActor* MakeDeleteConnectionActor(
TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
- [signer = std::move(signer)](
- const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString {
- return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent,
- signer);
+ [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
+ signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
+ auto& connectionContent = *request->Get()->ConnectionContent;
+
+ auto dropSecret =
+ DropSecretObjectQuery(connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer);
+
+ std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{
+ .SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())},
+ .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent,
+ objectStorageEndpoint,
+ signer)}};
+ if (dropSecret) {
+ statements.push_back(
+ TSchemaQueryTask{.SQL = *dropSecret,
+ .RollbackSQL = CreateSecretObjectQuery(
+ connectionContent.setting().object_storage().auth(),
+ connectionContent.name(),
+ signer)});
+ }
+ return statements;
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
- return "External data source with such name already exists";
- } else {
- return "Couldn't delete external data source in YDB";
- }
+ Y_UNUSED(queryStatus);
+ return "Couldn't delete external data source in YDB";
};
return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteConnectionRequest,
@@ -398,6 +470,39 @@ NActors::IActor* MakeDeleteConnectionActor(
errorMessageFactoryMethod);
}
+NActors::IActor* MakeDropCreateConnectionActor(
+ const TActorId& proxyActorId,
+ TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const NConfig::TCommonConfig& commonConfig,
+ TSigner::TPtr signer) {
+ auto queryFactoryMethod =
+ [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
+ signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
+ auto& connectionContent = request->Get()->Request.content();
+ return {TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery(
+ connectionContent.name())}},
+ TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
+ connectionContent, objectStorageEndpoint, signer)}}};
+ };
+ auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
+ Y_UNUSED(queryStatus);
+ return "Couldn't recreate external source in YDB";
+ };
+
+ return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ TEvControlPlaneProxy::TEvCreateConnectionResponse>(
+ proxyActorId,
+ std::move(request),
+ requestTimeout,
+ counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB),
+ queryFactoryMethod,
+ errorMessageFactoryMethod);
+}
+
/// Bindings actors
NActors::IActor* MakeCreateBindingActor(
const TActorId& proxyActorId,
@@ -477,11 +582,8 @@ NActors::IActor* MakeDeleteBindingActor(
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
- if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
- return "External data source with such name already exists";
- } else {
- return "Couldn't delete external data source in YDB";
- }
+ Y_UNUSED(queryStatus);
+ return "Couldn't delete external data source in YDB";
};
return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvDeleteBindingRequest,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
index 36b7a06370b..0df17eca66d 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -10,11 +10,9 @@
namespace NFq {
namespace NPrivate {
-using namespace NActors;
-
/// Connection manipulation actors
NActors::IActor* MakeCreateConnectionActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
@@ -23,7 +21,7 @@ NActors::IActor* MakeCreateConnectionActor(
bool successOnAlreadyExists = false);
NActors::IActor* MakeModifyConnectionActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
@@ -31,28 +29,37 @@ NActors::IActor* MakeModifyConnectionActor(
TSigner::TPtr signer);
NActors::IActor* MakeDeleteConnectionActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
+ const NConfig::TCommonConfig& commonConfig,
+ TSigner::TPtr signer);
+
+NActors::IActor* MakeDropCreateConnectionActor(
+ const NActors::TActorId& proxyActorId,
+ TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
+ TDuration requestTimeout,
+ TCounters& counters,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer);
/// Binding manipulation actors
NActors::IActor* MakeCreateBindingActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
bool successOnAlreadyExists = false);
NActors::IActor* MakeModifyBindingActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters);
NActors::IActor* MakeDeleteBindingActor(
- const TActorId& proxyActorId,
+ const NActors::TActorId& proxyActorId,
TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters);
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
index c2db1d6771e..3022997a06a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -1254,7 +1254,6 @@ private:
CPP_LOG_T("CreateConnectionRequest: " << request.DebugString());
const TString cloudId = ev->Get()->CloudId;
const TString subjectType = ev->Get()->SubjectType;
- const bool ydbOperationWasPerformed = ev->Get()->ComputeYDBOperationWasPerformed;
const TString scope = ev->Get()->Scope;
TString user = ev->Get()->User;
TString token = ev->Get()->Token;
@@ -1310,7 +1309,10 @@ private:
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->RequestValidationPassed) {
+ const auto isYDBOperationEnabled =
+ Config.ComputeConfig.IsYDBSchemaOperationsEnabled(
+ ev->Get()->Scope, ev->Get()->Request.content().setting().connection_case());
+ if (isYDBOperationEnabled && !ev->Get()->RequestValidationPassed) {
auto requestValidationIssues =
::NFq::ValidateConnection(ev,
Config.StorageConfig.Proto.GetMaxRequestSize(),
@@ -1340,7 +1342,7 @@ private:
TPermissions::TPermission::MANAGE_PUBLIC
};
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ydbOperationWasPerformed) {
+ if (isYDBOperationEnabled) {
if (!ev->Get()->YDBClient) {
ev->Get()->YDBClient = CreateNewTableClient(ev,
Config.ComputeConfig,
@@ -1348,14 +1350,55 @@ private:
CredentialsProviderFactory);
}
- Register(NPrivate::MakeCreateConnectionActor(
- ControlPlaneProxyActorId(),
- std::move(ev),
- Config.RequestTimeout,
- Counters,
- Config.CommonConfig,
- Signer));
- return;
+ if (!ev->Get()->ComputeYDBOperationWasPerformed && !ev->Get()->ComputeYDBIsAlreadyExistFlag) {
+ Register(NPrivate::MakeCreateConnectionActor(ControlPlaneProxyActorId(),
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters,
+ Config.CommonConfig,
+ Signer,
+ true));
+ return;
+ }
+
+ if (ev->Get()->ComputeYDBIsAlreadyExistFlag && !ev->Get()->CPSListingFinished) {
+ ev->Get()->ComputeYDBOperationWasPerformed = false;
+ Register(MakeListConnectionActor(ControlPlaneProxyActorId(),
+ std::move(ev),
+ Counters,
+ Config.RequestTimeout,
+ availablePermissions));
+ return;
+ }
+
+ if (ev->Get()->ComputeYDBIsAlreadyExistFlag &&
+ !ev->Get()->ComputeYDBOperationWasPerformed) {
+ if (ev->Get()->CPSConnectionCount != 0) {
+ CPS_LOG_E("CreateConnectionRequest, Connection with such name already exists: "
+ << scope << " " << user << " " << NKikimr::MaskTicket(token)
+ << " " << request.DebugString());
+ Send(ev->Sender,
+ new TEvControlPlaneProxy::TEvCreateConnectionResponse(
+ NYql::TIssues{
+ NYql::TIssue{"Connection with such name already exists"}},
+ subjectType),
+ 0,
+ ev->Cookie);
+ requestCounters.IncError();
+ TDuration delta = TInstant::Now() - startTime;
+ requestCounters.Common->LatencyMs->Collect(delta.MilliSeconds());
+ probe(delta, false, false);
+ return;
+ }
+
+ Register(NPrivate::MakeDropCreateConnectionActor(ControlPlaneProxyActorId(),
+ std::move(ev),
+ Config.RequestTimeout,
+ Counters,
+ Config.CommonConfig,
+ Signer));
+ return;
+ }
}
Register(new TRequestActor<FederatedQuery::CreateConnectionRequest,
@@ -1564,8 +1607,10 @@ private:
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) &&
- !ev->Get()->RequestValidationPassed) {
+ const auto isYDBOperationEnabled = Config.ComputeConfig.IsYDBSchemaOperationsEnabled(
+ ev->Get()->Scope, ev->Get()->Request.content().setting().connection_case());
+
+ if (isYDBOperationEnabled && !ev->Get()->RequestValidationPassed) {
auto requestValidationIssues =
::NFq::ValidateConnection(ev,
Config.StorageConfig.Proto.GetMaxRequestSize(),
@@ -1598,28 +1643,25 @@ private:
| TPermissions::TPermission::VIEW_PRIVATE
};
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->OldConnectionContent) {
+ if (isYDBOperationEnabled && !ev->Get()->OldConnectionContent) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBConnectionName(
+ Register(MakeDiscoverYDBConnectionNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->OldBindingNamesDiscoveryFinished) {
+ if (isYDBOperationEnabled && !ev->Get()->OldBindingNamesDiscoveryFinished) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeListBindingIds(
+ Register(MakeListBindingIdsActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) {
+ if (isYDBOperationEnabled && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDescribeListedBinding(
+ Register(MakeDescribeListedBindingActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
- const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
- if (!controlPlaneYDBOperationWasPerformed) {
- auto shouldReplyWithResponse =
- !Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope);
+ if (!ev->Get()->ControlPlaneYDBOperationWasPerformed) {
Register(new TRequestActor<FederatedQuery::ModifyConnectionRequest,
TEvControlPlaneStorage::TEvModifyConnectionRequest,
TEvControlPlaneStorage::TEvModifyConnectionResponse,
@@ -1631,11 +1673,11 @@ private:
requestCounters,
probe,
availablePermissions,
- shouldReplyWithResponse));
+ !isYDBOperationEnabled));
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) {
+ if (isYDBOperationEnabled) {
if (!ev->Get()->YDBClient) {
ev->Get()->YDBClient = CreateNewTableClient(ev,
Config.ComputeConfig,
@@ -1725,15 +1767,19 @@ private:
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ev->Get()->ConnectionContent) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBConnectionName(
+ Register(MakeDiscoverYDBConnectionNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
- const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
- if (!controlPlaneYDBOperationWasPerformed) {
- auto shouldReplyWithResponse =
- !Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope);
+ const auto isYDBOperationEnabled =
+ ev->Get()->ConnectionContent
+ ? Config.ComputeConfig.IsYDBSchemaOperationsEnabled(
+ ev->Get()->Scope,
+ ev->Get()->ConnectionContent->setting().connection_case())
+ : false;
+
+ if (!ev->Get()->ControlPlaneYDBOperationWasPerformed) {
Register(new TRequestActor<FederatedQuery::DeleteConnectionRequest,
TEvControlPlaneStorage::TEvDeleteConnectionRequest,
TEvControlPlaneStorage::TEvDeleteConnectionResponse,
@@ -1745,11 +1791,11 @@ private:
requestCounters,
probe,
availablePermissions,
- shouldReplyWithResponse));
+ !isYDBOperationEnabled));
return;
}
- if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) {
+ if (isYDBOperationEnabled) {
if (!ev->Get()->YDBClient) {
ev->Get()->YDBClient = CreateNewTableClient(ev,
Config.ComputeConfig,
@@ -1758,8 +1804,12 @@ private:
}
if (!ev->Get()->ComputeYDBOperationWasPerformed) {
- Register(MakeDeleteConnectionActor(
- ControlPlaneProxyActorId(), ev, Config.RequestTimeout, Counters, Signer));
+ Register(MakeDeleteConnectionActor(ControlPlaneProxyActorId(),
+ ev,
+ Config.RequestTimeout,
+ Counters,
+ Config.CommonConfig,
+ Signer));
return;
}
@@ -1923,7 +1973,7 @@ private:
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) && !ydbOperationWasPerformed) {
if (!ev->Get()->ConnectionName) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBConnectionName(
+ Register(MakeDiscoverYDBConnectionNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
@@ -2177,13 +2227,13 @@ private:
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope)) {
if (!ev->Get()->OldBindingContent) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBBindingName(
+ Register(MakeDiscoverYDBBindingNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
if (!ev->Get()->ConnectionName) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBConnectionName(
+ Register(MakeDiscoverYDBConnectionNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
@@ -2296,7 +2346,7 @@ private:
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled(ev->Get()->Scope) &&
!ev->Get()->OldBindingName) {
auto permissions = ExtractPermissions(ev, availablePermissions);
- Register(MakeDiscoverYDBBindingName(
+ Register(MakeDiscoverYDBBindingNameActor(
ControlPlaneProxyActorId(), ev, Counters, Config.RequestTimeout, permissions));
return;
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index 28d61bf2ba1..ebd103ae226 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -301,6 +301,25 @@ struct TEvControlPlaneProxy {
};
template<>
+ struct TControlPlaneRequest<FederatedQuery::CreateConnectionRequest,
+ EvCreateConnectionRequest> :
+ public TBaseControlPlaneRequest<TEvCreateConnectionRequest,
+ FederatedQuery::CreateConnectionRequest,
+ EvCreateConnectionRequest> {
+ using TBaseControlPlaneRequest<
+ TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>,
+ FederatedQuery::CreateConnectionRequest,
+ EvCreateConnectionRequest>::TBaseControlPlaneRequest;
+
+ bool ComputeYDBIsAlreadyExistFlag = false;
+ // Check that connection does not exist in CPS
+ bool CPSListingFinished = false;
+ size_t CPSConnectionCount = 0;
+ // ??
+ bool IsInDeleteCreateState = false;
+ };
+
+ template<>
struct TControlPlaneRequest<FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest> :
public TBaseControlPlaneRequest<TEvModifyConnectionRequest,
FederatedQuery::ModifyConnectionRequest,
@@ -310,6 +329,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::ModifyConnectionRequest,
EvModifyConnectionRequest>::TBaseControlPlaneRequest;
+ bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent;
// ListBindings
bool OldBindingNamesDiscoveryFinished = false;
@@ -329,6 +349,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::DeleteConnectionRequest,
EvDeleteConnectionRequest>::TBaseControlPlaneRequest;
+ bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::ConnectionContent> ConnectionContent;
};
@@ -342,6 +363,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::CreateBindingRequest,
EvCreateBindingRequest>::TBaseControlPlaneRequest;
+ bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<TString> ConnectionName;
};
@@ -355,6 +377,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::ModifyBindingRequest,
EvModifyBindingRequest>::TBaseControlPlaneRequest;
+ bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::BindingContent> OldBindingContent;
TMaybe<TString> ConnectionName;
};
@@ -369,6 +392,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::DeleteBindingRequest,
EvDeleteBindingRequest>::TBaseControlPlaneRequest;
+ bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<TString> OldBindingName;
};
};
diff --git a/ydb/core/fq/libs/control_plane_storage/events/events.h b/ydb/core/fq/libs/control_plane_storage/events/events.h
index bc47f475585..5ef1b4bc5dd 100644
--- a/ydb/core/fq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/fq/libs/control_plane_storage/events/events.h
@@ -175,19 +175,20 @@ struct TEvControlPlaneStorage {
static_assert(EvEnd <= YqEventSubspaceEnd(NFq::TYqEventSubspace::ControlPlaneStorage), "All events must be in their subspace");
- template<typename ProtoMessage, ui32 EventType>
- struct TControlPlaneRequest : NActors::TEventLocal<TControlPlaneRequest<ProtoMessage, EventType>, EventType> {
+ template<typename TDerived, typename ProtoMessage, ui32 EventType>
+ struct TBaseControlPlaneRequest : NActors::TEventLocal<TDerived, EventType> {
using TProto = ProtoMessage;
- explicit TControlPlaneRequest(const TString& scope,
- const ProtoMessage& request,
- const TString& user,
- const TString& token,
- const TString& cloudId,
- TPermissions permissions,
- TMaybe<TQuotaMap> quotas,
- TTenantInfo::TPtr tenantInfo,
- const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase)
+ explicit TBaseControlPlaneRequest(
+ const TString& scope,
+ const ProtoMessage& request,
+ const TString& user,
+ const TString& token,
+ const TString& cloudId,
+ TPermissions permissions,
+ TMaybe<TQuotaMap> quotas,
+ TTenantInfo::TPtr tenantInfo,
+ const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase)
: Scope(scope)
, Request(request)
, User(user)
@@ -196,9 +197,7 @@ struct TEvControlPlaneStorage {
, Permissions(permissions)
, Quotas(std::move(quotas))
, TenantInfo(tenantInfo)
- , ComputeDatabase(computeDatabase)
- {
- }
+ , ComputeDatabase(computeDatabase) { }
size_t GetByteSize() const {
return sizeof(*this)
@@ -220,6 +219,50 @@ struct TEvControlPlaneStorage {
FederatedQuery::Internal::ComputeDatabaseInternal ComputeDatabase;
};
+ template<typename TProtoMessage, ui32 EventType>
+ struct TControlPlaneRequest :
+ TBaseControlPlaneRequest<TControlPlaneRequest<TProtoMessage, EventType>,
+ TProtoMessage,
+ EventType> {
+ using TBaseControlPlaneRequest<TControlPlaneRequest<TProtoMessage, EventType>,
+ TProtoMessage,
+ EventType>::TBaseControlPlaneRequest;
+ };
+
+ template<typename ProtoMessage, ui32 EventType>
+ struct TControlPlaneListRequest :
+ public TBaseControlPlaneRequest<TControlPlaneListRequest<ProtoMessage, EventType>,
+ ProtoMessage,
+ EventType> {
+ using TProto = ProtoMessage;
+
+ explicit TControlPlaneListRequest(
+ const TString& scope,
+ const ProtoMessage& request,
+ const TString& user,
+ const TString& token,
+ const TString& cloudId,
+ TPermissions permissions,
+ TMaybe<TQuotaMap> quotas,
+ TTenantInfo::TPtr tenantInfo,
+ const FederatedQuery::Internal::ComputeDatabaseInternal& computeDatabase,
+ bool isExactNameMatch = false)
+ : TBaseControlPlaneRequest<TControlPlaneListRequest<ProtoMessage, EventType>,
+ ProtoMessage,
+ EventType>(scope,
+ request,
+ user,
+ token,
+ cloudId,
+ permissions,
+ std::move(quotas),
+ tenantInfo,
+ computeDatabase)
+ , IsExactNameMatch(isExactNameMatch) { }
+
+ bool IsExactNameMatch = false;
+ };
+
template<typename TDerived, typename ProtoMessage, ui32 EventType>
struct TControlPlaneResponse : NActors::TEventLocal<TDerived, EventType> {
using TProto = ProtoMessage;
@@ -326,7 +369,7 @@ struct TEvControlPlaneStorage {
using TEvDescribeJobResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeJobResult, EvDescribeJobResponse>;
using TEvCreateConnectionRequest = TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>;
using TEvCreateConnectionResponse = TControlPlaneAuditableResponse<FederatedQuery::CreateConnectionResult, FederatedQuery::Connection, EvCreateConnectionResponse>;
- using TEvListConnectionsRequest = TControlPlaneRequest<FederatedQuery::ListConnectionsRequest, EvListConnectionsRequest>;
+ using TEvListConnectionsRequest = TControlPlaneListRequest<FederatedQuery::ListConnectionsRequest, EvListConnectionsRequest>;
using TEvListConnectionsResponse = TControlPlaneNonAuditableResponse<FederatedQuery::ListConnectionsResult, EvListConnectionsResponse>;
using TEvDescribeConnectionRequest = TControlPlaneRequest<FederatedQuery::DescribeConnectionRequest, EvDescribeConnectionRequest>;
using TEvDescribeConnectionResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeConnectionResult, EvDescribeConnectionResponse>;
@@ -336,7 +379,7 @@ struct TEvControlPlaneStorage {
using TEvDeleteConnectionResponse = TControlPlaneAuditableResponse<FederatedQuery::DeleteConnectionResult, FederatedQuery::Connection, EvDeleteConnectionResponse>;
using TEvCreateBindingRequest = TControlPlaneRequest<FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>;
using TEvCreateBindingResponse = TControlPlaneAuditableResponse<FederatedQuery::CreateBindingResult, FederatedQuery::Binding, EvCreateBindingResponse>;
- using TEvListBindingsRequest = TControlPlaneRequest<FederatedQuery::ListBindingsRequest, EvListBindingsRequest>;
+ using TEvListBindingsRequest = TControlPlaneListRequest<FederatedQuery::ListBindingsRequest, EvListBindingsRequest>;
using TEvListBindingsResponse = TControlPlaneNonAuditableResponse<FederatedQuery::ListBindingsResult, EvListBindingsResponse>;
using TEvDescribeBindingRequest = TControlPlaneRequest<FederatedQuery::DescribeBindingRequest, EvDescribeBindingRequest>;
using TEvDescribeBindingResponse = TControlPlaneNonAuditableResponse<FederatedQuery::DescribeBindingResult, EvDescribeBindingResponse>;
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
index 3e134579bd0..ddeaf865d23 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_bindings.cpp
@@ -214,7 +214,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListBinding
if (request.filter().name()) {
queryBuilder.AddString("filter_name", request.filter().name());
- filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'");
+ if (event.IsExactNameMatch) {
+ filters.push_back("`" NAME_COLUMN_NAME "` = '$filter_name'");
+ } else {
+ filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'");
+ }
}
if (request.filter().created_by_me()) {
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
index 8edb3dc7010..412b5b28ea1 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_connections.cpp
@@ -204,7 +204,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvListConnect
TVector<TString> filters;
if (request.filter().name()) {
queryBuilder.AddString("filter_name", request.filter().name());
- filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'");
+ if (event.IsExactNameMatch) {
+ filters.push_back("`" NAME_COLUMN_NAME "` = '$filter_name'");
+ } else {
+ filters.push_back("`" NAME_COLUMN_NAME "` LIKE '%' || $filter_name || '%'");
+ }
}
if (request.filter().created_by_me()) {