aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-08-28 19:11:35 +0300
committerhcpp <hcpp@ydb.tech>2023-08-28 20:30:28 +0300
commit80813963258b24f435ecede4c9f9f915ad2f1340 (patch)
treeef0f66fe2cfe3aec536a8bcbb410c1dfc4120d22
parent4fe57f476dc466219c975b4d11ee46f33bfcd762 (diff)
downloadydb-80813963258b24f435ecede4c9f9f915ad2f1340.tar.gz
execution config has been fixed
-rw-r--r--ydb/core/fq/libs/actors/pending_fetcher.cpp2
-rw-r--r--ydb/core/fq/libs/compute/common/config.h38
-rw-r--r--ydb/core/fq/libs/compute/common/utils.h2
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp4
-rw-r--r--ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp27
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h5
-rw-r--r--ydb/core/fq/libs/config/protos/compute.proto4
7 files changed, 58 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp
index a584951edc..b901678577 100644
--- a/ydb/core/fq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp
@@ -349,7 +349,7 @@ private:
*resources.mutable_topic_consumers() = task.created_topic_consumers();
}
- NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetConnection(task.scope());
+ NFq::NConfig::TYdbStorageConfig computeConnection = ComputeConfig.GetExecutionConnection(task.scope());
computeConnection.set_endpoint(task.compute_connection().endpoint());
computeConnection.set_database(task.compute_connection().database());
computeConnection.set_usessl(task.compute_connection().usessl());
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h
index 938d1dcba8..974d53e15d 100644
--- a/ydb/core/fq/libs/compute/common/config.h
+++ b/ydb/core/fq/libs/compute/common/config.h
@@ -68,7 +68,7 @@ public:
}
}
- NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const {
+ NFq::NConfig::TYdbStorageConfig GetControlPlaneConnection(const TString& scope) const {
const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane();
switch (controlPlane.type_case()) {
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
@@ -76,22 +76,48 @@ public:
case NConfig::TYdbComputeControlPlane::kSingle:
return controlPlane.GetSingle().GetConnection();
case NConfig::TYdbComputeControlPlane::kCms:
- return GetConnection(scope, controlPlane.GetCms().GetDatabaseMapping());
+ return GetControlPlaneConnection(scope, controlPlane.GetCms().GetDatabaseMapping());
case NConfig::TYdbComputeControlPlane::kYdbcp:
- return GetConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping());
+ return GetControlPlaneConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping());
}
}
- NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {
+ NFq::NConfig::TYdbStorageConfig GetControlPlaneConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {
auto it = databaseMapping.GetScopeToComputeDatabase().find(scope);
if (it != databaseMapping.GetScopeToComputeDatabase().end()) {
- return it->second.GetConnection();
+ return it->second.GetControlPlaneConnection();
}
return databaseMapping.GetCommon().empty()
? NFq::NConfig::TYdbStorageConfig{}
: databaseMapping
.GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size())
- .GetConnection();
+ .GetControlPlaneConnection();
+ }
+
+ NFq::NConfig::TYdbStorageConfig GetExecutionConnection(const TString& scope) const {
+ const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane();
+ switch (controlPlane.type_case()) {
+ case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
+ return {};
+ case NConfig::TYdbComputeControlPlane::kSingle:
+ return controlPlane.GetSingle().GetConnection();
+ case NConfig::TYdbComputeControlPlane::kCms:
+ return GetExecutionConnection(scope, controlPlane.GetCms().GetDatabaseMapping());
+ case NConfig::TYdbComputeControlPlane::kYdbcp:
+ return GetExecutionConnection(scope, controlPlane.GetYdbcp().GetDatabaseMapping());
+ }
+ }
+
+ NFq::NConfig::TYdbStorageConfig GetExecutionConnection(const TString& scope, const ::NFq::NConfig::TDatabaseMapping& databaseMapping) const {
+ auto it = databaseMapping.GetScopeToComputeDatabase().find(scope);
+ if (it != databaseMapping.GetScopeToComputeDatabase().end()) {
+ return it->second.GetExecutionConnection();
+ }
+ return databaseMapping.GetCommon().empty()
+ ? NFq::NConfig::TYdbStorageConfig{}
+ : databaseMapping
+ .GetCommon(MultiHash(scope) % databaseMapping.GetCommon().size())
+ .GetExecutionConnection();
}
bool YdbComputeControlPlaneEnabled(const TString& scope) const {
diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h
index 40654687c6..3ee72cb19a 100644
--- a/ydb/core/fq/libs/compute/common/utils.h
+++ b/ydb/core/fq/libs/compute/common/utils.h
@@ -13,7 +13,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
const TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
- NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(scope);
+ NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetExecutionConnection(scope);
computeConnection.set_endpoint(connection.endpoint());
computeConnection.set_database(connection.database());
computeConnection.set_usessl(connection.usessl());
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
index 6b564e49a9..2038c00023 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/cms_grpc_client_actor.cpp
@@ -95,9 +95,9 @@ public:
}
forwardResponse->Result.set_id(request.Get()->Get()->Path);
- forwardResponse->Result.mutable_connection()->set_endpoint(Settings.Endpoint);
+ forwardResponse->Result.mutable_connection()->set_endpoint(request->Get()->ExecutionConnection.GetEndpoint());
forwardResponse->Result.mutable_connection()->set_database(request.Get()->Get()->Path);
- forwardResponse->Result.mutable_connection()->set_usessl(Settings.EnableSsl);
+ forwardResponse->Result.mutable_connection()->set_usessl(request->Get()->ExecutionConnection.GetUseSsl());
Send(request->Sender, forwardResponse.release(), 0, request->Cookie);
}
diff --git a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
index ce8f98d83a..59650e1e5f 100644
--- a/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp
@@ -33,11 +33,12 @@ using namespace NFq;
class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDatabaseRequestActor> {
public:
- TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, const TActorId& synchronizationServiceActorId, const NFq::NConfig::TComputeConfig& config, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request)
+ TCreateDatabaseRequestActor(const TActorId& databaseClientActorId, const TActorId& synchronizationServiceActorId, const NFq::NConfig::TComputeConfig& config, const NFq::NConfig::TYdbStorageConfig& executionConnection, TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& request)
: DatabaseClientActorId(databaseClientActorId)
, SynchronizationServiceActorId(synchronizationServiceActorId)
, Config(config)
, Request(request)
+ , ExecutionConnection(executionConnection)
{}
static constexpr char ActorName[] = "FQ_CREATE_DATABASE_REQUEST_ACTOR";
@@ -72,7 +73,7 @@ public:
const auto& result = ev->Get()->Record;
if (issues && issues.back().IssueCode == TIssuesIds::ACCESS_DENIED) {
- Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path});
+ Send(DatabaseClientActorId, new TEvYdbCompute::TEvCreateDatabaseRequest{Request->Get()->CloudId, Request->Get()->Scope, Request->Get()->BasePath, Request->Get()->Path, ExecutionConnection});
return;
}
@@ -128,6 +129,7 @@ private:
NFq::NConfig::TComputeConfig Config;
TEvYdbCompute::TEvCreateDatabaseRequest::TPtr Request;
FederatedQuery::Internal::ComputeDatabaseInternal Result;
+ NFq::NConfig::TYdbStorageConfig ExecutionConnection;
};
class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> {
@@ -177,7 +179,7 @@ public:
static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
NCloud::TGrpcClientSettings settings;
- const auto& connection = config.GetConnection();
+ const auto& connection = config.GetControlPlaneConnection();
settings.Endpoint = connection.GetEndpoint();
settings.EnableSsl = connection.GetUseSsl();
if (connection.GetCertificateFile()) {
@@ -189,26 +191,26 @@ public:
void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig) {
const auto& mapping = cmsConfig.GetDatabaseMapping();
for (const auto& config: mapping.GetCommon()) {
- CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config});
+ CommonDatabaseClients.push_back({Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config});
}
Y_VERIFY(CommonDatabaseClients);
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
- ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config};
+ ScopeToDatabaseClient[scope] = {Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config};
}
}
void CreateControlPlaneClientActors(const NConfig::TYdbComputeControlPlane::TYdbcp& controlPlaneConfig) {
const auto& mapping = controlPlaneConfig.GetDatabaseMapping();
for (const auto& config: mapping.GetCommon()) {
- CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config});
+ CommonDatabaseClients.push_back({Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config});
}
Y_VERIFY(CommonDatabaseClients);
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
- ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.connection()))->CreateProvider()).release()), config};
+ ScopeToDatabaseClient[scope] = {Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release()), config};
}
}
@@ -218,7 +220,7 @@ public:
void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
if (Config.GetYdb().GetControlPlane().HasSingle()) {
- Register(new TCreateDatabaseRequestActor(TActorId{}, SynchronizationServiceActorId, Config, ev));
+ Register(new TCreateDatabaseRequestActor(TActorId{}, SynchronizationServiceActorId, Config, Config.GetYdb().GetControlPlane().GetSingle().GetConnection(), ev));
return;
}
@@ -226,18 +228,19 @@ public:
auto it = ScopeToDatabaseClient.find(scope);
if (it != ScopeToDatabaseClient.end()) {
FillRequest(ev, it->second.Config);
- Register(new TCreateDatabaseRequestActor(it->second.ActorId, SynchronizationServiceActorId, Config, ev));
+ Register(new TCreateDatabaseRequestActor(it->second.ActorId, SynchronizationServiceActorId, Config, it->second.Config.GetExecutionConnection(), ev));
return;
}
const auto& clientConfig = CommonDatabaseClients[MultiHash(scope) % CommonDatabaseClients.size()];
FillRequest(ev, clientConfig.Config);
- Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, SynchronizationServiceActorId, Config, ev));
+ Register(new TCreateDatabaseRequestActor(clientConfig.ActorId, SynchronizationServiceActorId, Config, clientConfig.Config.GetExecutionConnection(), ev));
}
void FillRequest(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev, const NConfig::TComputeDatabaseConfig& config) {
NYdb::NFq::TScope scope(ev.Get()->Get()->Scope);
- ev.Get()->Get()->BasePath = config.GetConnection().GetDatabase();
- ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + scope.ParseFolder() : scope.ParseFolder();
+ ev.Get()->Get()->BasePath = config.GetControlPlaneConnection().GetDatabase();
+ const TString databaseName = Config.GetYdb().GetControlPlane().GetDatabasePrefix() + scope.ParseFolder();
+ ev.Get()->Get()->Path = config.GetTenant() ? config.GetTenant() + "/" + databaseName: databaseName;
}
private:
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index c6100a24c6..f2e5b6592f 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -191,17 +191,20 @@ struct TEvYdbCompute {
TEvCreateDatabaseRequest(const TString& cloudId,
const TString& scope,
const TString& basePath,
- const TString& path)
+ const TString& path,
+ const NFq::NConfig::TYdbStorageConfig& executionConnection)
: CloudId(cloudId)
, Scope(scope)
, BasePath(basePath)
, Path(path)
+ , ExecutionConnection(executionConnection)
{}
TString CloudId;
TString Scope;
TString BasePath;
TString Path;
+ NFq::NConfig::TYdbStorageConfig ExecutionConnection;
};
struct TEvCreateDatabaseResponse : public NActors::TEventLocal<TEvCreateDatabaseResponse, EvCreateDatabaseResponse> {
diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto
index 7c9d857a4a..c59b31afee 100644
--- a/ydb/core/fq/libs/config/protos/compute.proto
+++ b/ydb/core/fq/libs/config/protos/compute.proto
@@ -18,7 +18,8 @@ message TInPlaceCompute {
}
message TComputeDatabaseConfig {
- TYdbStorageConfig Connection = 1;
+ TYdbStorageConfig ControlPlaneConnection = 1;
+ TYdbStorageConfig ExecutionConnection = 3;
string Tenant = 2;
}
@@ -46,6 +47,7 @@ message TYdbComputeControlPlane {
TCms Cms = 3;
TYdbcp Ydbcp = 4;
}
+ string DatabasePrefix = 5;
}
message TYdbCompute {