diff options
author | hcpp <hcpp@ydb.tech> | 2023-08-28 19:11:35 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-08-28 20:30:28 +0300 |
commit | 80813963258b24f435ecede4c9f9f915ad2f1340 (patch) | |
tree | ef0f66fe2cfe3aec536a8bcbb410c1dfc4120d22 | |
parent | 4fe57f476dc466219c975b4d11ee46f33bfcd762 (diff) | |
download | ydb-80813963258b24f435ecede4c9f9f915ad2f1340.tar.gz |
execution config has been fixed
7 files changed, 58 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index a584951edc..b901678577 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -349,7 +349,7 @@ private: *resources.mutable_topic_consumers() = task.created_topic_consumers(); } - NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetConnection(task.scope()); + NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetExecutionConnection(task.scope()); computeConnection.set_endpoint(task.compute_connection().endpoint()); computeConnection.set_database(task.compute_connection().database()); computeConnection.set_usessl(task.compute_connection().usessl()); diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index 938d1dcba8..974d53e15d 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -68,7 +68,7 @@ public: } } - NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const { + NFq::NConfig::TYdbStorageConfig GetControlPlaneConnection(const TString& scope) const { const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane(); switch (controlPlane.type_case()) { case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: @@ -76,22 +76,48 @@ public: case NConfig::TYdbComputeControlPlane::kSingle: return controlPlane.GetSingle().GetConnection(); case NConfig::TYdbComputeControlPlane::kCms: - return GetConnection(scope, controlPlane.GetCms().GetDatabaseMapping()); + return GetControlPlaneConnection(scope, controlPlane.GetCms().GetDatabaseMapping()); case NConfig::TYdbComputeControlPlane::kYdbcp: - return GetConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping()); + return GetControlPlaneConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping()); } } - NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const { + NFq::NConfig::TYdbStorageConfig GetControlPlaneConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const { auto it = databaseMapping.GetScopeToComputeDatabase().find(scope); if (it != databaseMapping.GetScopeToComputeDatabase().end()) { - return it->second.GetConnection(); + return it->second.GetControlPlaneConnection(); } return databaseMapping.GetCommon().empty() ? NFq::NConfig::TYdbStorageConfig{} : databaseMapping .GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size()) - .GetConnection(); + .GetControlPlaneConnection(); + } + + NFq::NConfig::TYdbStorageConfig GetExecutionConnection(const TString& scope) const { + const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane(); + switch (controlPlane.type_case()) { + case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET: + return {}; + case NConfig::TYdbComputeControlPlane::kSingle: + return controlPlane.GetSingle().GetConnection(); + case NConfig::TYdbComputeControlPlane::kCms: + return GetExecutionConnection(scope, controlPlane.GetCms().GetDatabaseMapping()); + case NConfig::TYdbComputeControlPlane::kYdbcp: + return GetExecutionConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping()); + } + } + + NFq::NConfig::TYdbStorageConfig GetExecutionConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const { + auto it = databaseMapping.GetScopeToComputeDatabase().find(scope); + if (it != databaseMapping.GetScopeToComputeDatabase().end()) { + return it->second.GetExecutionConnection(); + } + return databaseMapping.GetCommon().empty() + ? NFq::NConfig::TYdbStorageConfig{} + : databaseMapping + .GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size()) + .GetExecutionConnection(); } bool YdbComputeControlPlaneEnabled(const TString& scope) const { diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h index 40654687c6..3ee72cb19a 100644 --- a/ydb/core/fq/libs/compute/common/utils.h +++ b/ydb/core/fq/libs/compute/common/utils.h @@ -13,7 +13,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS const TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(scope); + NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetExecutionConnection(scope); computeConnection.set_endpoint(connection.endpoint()); computeConnection.set_database(connection.database()); computeConnection.set_usessl(connection.usessl()); diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp index 6b564e49a9..2038c00023 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp @@ -95,9 +95,9 @@ public: } forwardResponse->Result.set_id(request.Get()->Get()->Path); - forwardResponse->Result.mutable_connection()->set_endpoint(Settings.Endpoint); + forwardResponse->Result.mutable_connection()->set_endpoint(request->Get()->ExecutionConnection.GetEndpoint()); forwardResponse->Result.mutable_connection()->set_database(request.Get()->Get()->Path); - forwardResponse->Result.mutable_connection()->set_usessl(Settings.EnableSsl); + forwardResponse->Result.mutable_connection()->set_usessl(request->Get()->ExecutionConnection.GetUseSsl()); Send(request->Sender, forwardResponse.release(), 0, request->Cookie); } diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp index ce8f98d83a..59650e1e5f 100644 --- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp @@ -33,11 +33,12 @@ using namespace NFq; class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDatabaseRequestActor> { public: - TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, const TActorId& synchronizationServiceActorId, const NFq::NConfig::TComputeConfig& config, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request) + TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, const TActorId& synchronizationServiceActorId, const NFq::NConfig::TComputeConfig& config, const NFq::NConfig::TYdbStorageConfig& executionConnection, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request) : DatabaseClientActorId(databaseClientActorId) , SynchronizationServiceActorId(synchronizationServiceActorId) , Config(config) , Request(request) + , ExecutionConnection(executionConnection) {} static constexpr char ActorName[] = "FQ_CREATE_DATABASE_REQUEST_ACTOR"; @@ -72,7 +73,7 @@ public: const auto& result = ev->Get()->Record; if (issues && issues.back().IssueCode == TIssuesIds::ACCESS_DENIED) { - Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path}); + Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path, ExecutionConnection}); return; } @@ -128,6 +129,7 @@ private: NFq::NConfig::TComputeConfig Config; TEvYdbCompute::TEvCreateDatabaseRequest::TPtr Request; FederatedQuery::Internal::ComputeDatabaseInternal Result; + NFq::NConfig::TYdbStorageConfig ExecutionConnection; }; class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> { @@ -177,7 +179,7 @@ public: static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) { NCloud::TGrpcClientSettings settings; - const auto& connection = config.GetConnection(); + const auto& connection = config.GetControlPlaneConnection(); settings.Endpoint = connection.GetEndpoint(); settings.EnableSsl = connection.GetUseSsl(); if (connection.GetCertificateFile()) { @@ -189,26 +191,26 @@ public: void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig) { const auto& mapping = cmsConfig.GetDatabaseMapping(); for (const auto& config: mapping.GetCommon()) { - CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}); + CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config}); } Y_VERIFY(CommonDatabaseClients); for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { - ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}; + ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config}; } } void CreateControlPlaneClientActors(const NConfig::TYdbComputeControlPlane::TYdbcp& controlPlaneConfig) { const auto& mapping = controlPlaneConfig.GetDatabaseMapping(); for (const auto& config: mapping.GetCommon()) { - CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}); + CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config}); } Y_VERIFY(CommonDatabaseClients); for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) { - ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config}; + ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config}; } } @@ -218,7 +220,7 @@ public: void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) { if (Config.GetYdb().GetControlPlane().HasSingle()) { - Register(new TCreateDatabaseRequestActor(TActorId{}, SynchronizationServiceActorId, Config, ev)); + Register(new TCreateDatabaseRequestActor(TActorId{}, SynchronizationServiceActorId, Config, Config.GetYdb().GetControlPlane().GetSingle().GetConnection(), ev)); return; } @@ -226,18 +228,19 @@ public: auto it = ScopeToDatabaseClient.find(scope); if (it != ScopeToDatabaseClient.end()) { FillRequest(ev, it->second.Config); - Register(new TCreateDatabaseRequestActor(it->second.ActorId, SynchronizationServiceActorId, Config, ev)); + Register(new TCreateDatabaseRequestActor(it->second.ActorId, SynchronizationServiceActorId, Config, it->second.Config.GetExecutionConnection(), ev)); return; } const auto& clientConfig = CommonDatabaseClients[MultiHash(scope) % CommonDatabaseClients.size()]; FillRequest(ev, clientConfig.Config); - Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, SynchronizationServiceActorId, Config, ev)); + Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, SynchronizationServiceActorId, Config, clientConfig.Config.GetExecutionConnection(), ev)); } void FillRequest(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev, const NConfig::TComputeDatabaseConfig& config) { NYdb::NFq::TScope scope(ev.Get()->Get()->Scope); - ev.Get()->Get()->BasePath = config.GetConnection().GetDatabase(); - ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + scope.ParseFolder() : scope.ParseFolder(); + ev.Get()->Get()->BasePath = config.GetControlPlaneConnection().GetDatabase(); + const TString databaseName = Config.GetYdb().GetControlPlane().GetDatabasePrefix() + scope.ParseFolder(); + ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + databaseName: databaseName; } private: diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index c6100a24c6..f2e5b6592f 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -191,17 +191,20 @@ struct TEvYdbCompute { TEvCreateDatabaseRequest(const TString& cloudId, const TString& scope, const TString& basePath, - const TString& path) + const TString& path, + const NFq::NConfig::TYdbStorageConfig& executionConnection) : CloudId(cloudId) , Scope(scope) , BasePath(basePath) , Path(path) + , ExecutionConnection(executionConnection) {} TString CloudId; TString Scope; TString BasePath; TString Path; + NFq::NConfig::TYdbStorageConfig ExecutionConnection; }; struct TEvCreateDatabaseResponse : public NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> { diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index 7c9d857a4a..c59b31afee 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -18,7 +18,8 @@ message TInPlaceCompute { } message TComputeDatabaseConfig { - TYdbStorageConfig Connection = 1; + TYdbStorageConfig ControlPlaneConnection = 1; + TYdbStorageConfig ExecutionConnection = 3; string Tenant = 2; } @@ -46,6 +47,7 @@ message TYdbComputeControlPlane { TCms Cms = 3; TYdbcp Ydbcp = 4; } + string DatabasePrefix = 5; } message TYdbCompute { |