diff options
author | hcpp <hcpp@ydb.tech> | 2023-08-08 16:46:07 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-08-08 17:43:48 +0300 |
commit | f95fc3633ff92fc77fbc6220aa9b3653d0df412e (patch) | |
tree | 93f70bbc451b02cd45765a890fa84962bcae023d | |
parent | b2b56c1d3c5356e928b9bff7fa43a5b338f99d21 (diff) | |
download | ydb-f95fc3633ff92fc77fbc6220aa9b3653d0df412e.tar.gz |
pin for yqv2
16 files changed, 71 insertions, 20 deletions
diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index f735dcdeeb..a332189cb2 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -1,15 +1,22 @@ #pragma once #include <ydb/core/fq/libs/config/protos/compute.pb.h> +#include <ydb/core/fq/libs/protos/fq_private.pb.h> #include <util/digest/multi.h> #include <util/generic/algorithm.h> +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> #include <util/generic/yexception.h> namespace NFq { class TComputeConfig { public: + explicit TComputeConfig() + : TComputeConfig({}) + {} + explicit TComputeConfig(const NFq::NConfig::TComputeConfig& computeConfig) : ComputeConfig(computeConfig) , DefaultCompute(ComputeConfig.GetDefaultCompute() != NFq::NConfig::EComputeType::UNKNOWN @@ -48,6 +55,19 @@ public: return DefaultCompute; } + TVector<TString> GetPinTenantNames(FederatedQuery::QueryContent::QueryType queryType, const TString& scope) const { + NFq::NConfig::EComputeType computeType = GetComputeType(queryType, scope); + switch (computeType) { + case NFq::NConfig::EComputeType::YDB: + return TVector<TString>{ComputeConfig.GetYdb().GetPinTenantName().begin(), ComputeConfig.GetYdb().GetPinTenantName().end()}; + case NFq::NConfig::EComputeType::IN_PLACE: + case NFq::NConfig::EComputeType::UNKNOWN: + case NFq::NConfig::EComputeType_INT_MIN_SENTINEL_DO_NOT_USE_: + case NFq::NConfig::EComputeType_INT_MAX_SENTINEL_DO_NOT_USE_: + return TVector<TString>{}; + } + } + NFq::NConfig::TYdbStorageConfig GetConnection(const TString& scope) const { const auto& controlPlane = ComputeConfig.GetYdb().GetControlPlane(); switch (controlPlane.type_case()) { diff --git a/ydb/core/fq/libs/config/protos/compute.proto b/ydb/core/fq/libs/config/protos/compute.proto index 2166d7d1b0..7c9d857a4a 100644 --- a/ydb/core/fq/libs/config/protos/compute.proto +++ b/ydb/core/fq/libs/config/protos/compute.proto @@ -52,6 +52,7 @@ message TYdbCompute { bool Enable = 1; TYdbComputeControlPlane ControlPlane = 2; TSynchronizationService SynchronizationService = 3; + repeated string PinTenantName = 4; } enum EComputeType { diff --git a/ydb/core/fq/libs/control_plane_config/control_plane_config.cpp b/ydb/core/fq/libs/control_plane_config/control_plane_config.cpp index ce474bda42..78c289694f 100644 --- a/ydb/core/fq/libs/control_plane_config/control_plane_config.cpp +++ b/ydb/core/fq/libs/control_plane_config/control_plane_config.cpp @@ -34,17 +34,19 @@ class TControlPlaneConfigActor : public NActors::TActorBootstrapped<TControlPlan NDbPool::TDbPool::TPtr DbPool; ::NMonitoring::TDynamicCounterPtr Counters; NConfig::TControlPlaneStorageConfig Config; + NConfig::TComputeConfig ComputeConfig; TTenantInfo::TPtr TenantInfo; bool LoadInProgress = false; TDuration DbReloadPeriod; TString TablePathPrefix; public: - TControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) + TControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const NConfig::TComputeConfig& computeConfig, const ::NMonitoring::TDynamicCounterPtr& counters) : YqSharedResources(yqSharedResources) , CredProviderFactory(credProviderFactory) , Counters(counters) , Config(config) + , ComputeConfig(computeConfig) { DbReloadPeriod = GetDuration(Config.GetDbReloadPeriod(), TDuration::Seconds(3)); } @@ -60,7 +62,7 @@ public: TablePathPrefix = YdbConnection->TablePathPrefix; Schedule(TDuration::Zero(), new NActors::TEvents::TEvWakeup()); } else { - TenantInfo.reset(new TTenantInfo()); + TenantInfo.reset(new TTenantInfo(ComputeConfig)); const auto& mapping = Config.GetMapping(); for (const auto& cloudToTenant : mapping.GetCloudIdToTenantName()) { TenantInfo->SubjectMapping[SUBJECT_TYPE_CLOUD].emplace(cloudToTenant.GetKey(), cloudToTenant.GetValue()); @@ -108,7 +110,7 @@ private: LoadInProgress = true; TDbExecutable::TPtr executable; - auto& executer = TTenantExecuter::Create(executable, true, [](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo()); } ); + auto& executer = TTenantExecuter::Create(executable, true, [computeConfig=ComputeConfig](TTenantExecuter& executer) { executer.State.reset(new TTenantInfo(computeConfig)); } ); executer.Read( [=](TTenantExecuter&, TSqlQueryBuilder& builder) { @@ -242,8 +244,12 @@ TActorId ControlPlaneConfigActorId() { return NActors::TActorId(0, name); } -NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters) { - return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, counters); +NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, + const NConfig::TControlPlaneStorageConfig& config, + const NConfig::TComputeConfig& computeConfig, + const ::NMonitoring::TDynamicCounterPtr& counters) { + return new TControlPlaneConfigActor(yqSharedResources, credProviderFactory, config, computeConfig, counters); } } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_config/control_plane_config.h b/ydb/core/fq/libs/control_plane_config/control_plane_config.h index 74065bfe97..f81188dd56 100644 --- a/ydb/core/fq/libs/control_plane_config/control_plane_config.h +++ b/ydb/core/fq/libs/control_plane_config/control_plane_config.h @@ -23,6 +23,10 @@ namespace NFq { NActors::TActorId ControlPlaneConfigActorId(); -NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const NConfig::TControlPlaneStorageConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters); +NActors::IActor* CreateControlPlaneConfigActor(const ::NFq::TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, + const NConfig::TControlPlaneStorageConfig& config, + const NConfig::TComputeConfig& computeConfig, + const ::NMonitoring::TDynamicCounterPtr& counters); } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_config/events/events.h b/ydb/core/fq/libs/control_plane_config/events/events.h index 20f75253fa..ae9fb4733b 100644 --- a/ydb/core/fq/libs/control_plane_config/events/events.h +++ b/ydb/core/fq/libs/control_plane_config/events/events.h @@ -2,6 +2,7 @@ #include <ydb/core/fq/libs/events/event_subspace.h> #include <ydb/core/fq/libs/quota_manager/events/events.h> +#include <ydb/core/fq/libs/compute/common/config.h> #include <ydb/public/api/protos/draft/fq.pb.h> @@ -30,9 +31,21 @@ struct TTenantInfo { THashMap<TString /* vtenant */, TString /* tenant */> TenantMapping; THashMap<TString /* tenant */, ui32 /* state */> TenantState; TInstant StateTime; + NFq::TComputeConfig ComputeConfig; + + TTenantInfo() = default; + + TTenantInfo(const NFq::NConfig::TComputeConfig& computeConfig) + : ComputeConfig(computeConfig) + {} // this method must be thread safe - TString Assign(const TString& cloudId, const TString& /* scope */, const TString& DefaultTenantName = "") const { + TString Assign(const TString& cloudId, const TString& scope, FederatedQuery::QueryContent::QueryType queryType, const TString& DefaultTenantName = "") const { + auto pinTenants = ComputeConfig.GetPinTenantNames(queryType, scope); + if (pinTenants) { + return pinTenants[MultiHash(cloudId) % pinTenants.size()]; + } + auto it = SubjectMapping.find(SUBJECT_TYPE_CLOUD); auto vTenant = it == SubjectMapping.end() ? "" : it->second.Value(cloudId, ""); if (!vTenant && CommonVTenants.size()) { diff --git a/ydb/core/fq/libs/control_plane_proxy/config.cpp b/ydb/core/fq/libs/control_plane_proxy/config.cpp index 20cb82e9df..df443fae4d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/config.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/config.cpp @@ -30,7 +30,7 @@ TControlPlaneProxyConfig::TControlPlaneProxyConfig( const NConfig::TCommonConfig& commonConfig, const NYql::TS3GatewayConfig& s3Config) : Proto(FillDefaultParameters(config)) - , StorageConfig(TControlPlaneStorageConfig(storageConfig, s3Config, commonConfig)) + , StorageConfig(TControlPlaneStorageConfig(storageConfig, s3Config, commonConfig, {})) , ComputeConfig(computeConfig) , CommonConfig(commonConfig) , RequestTimeout(GetDuration(Proto.GetRequestTimeout(), TDuration::Seconds(30))) diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp index 04c5fd1495..000bc9d6f5 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp @@ -405,7 +405,7 @@ private: ); auto configService = CreateControlPlaneConfigActor(NFq::TYqSharedResources::TPtr{}, NKikimr::TYdbCredentialsProviderFactory(nullptr), - NConfig::TControlPlaneStorageConfig{}, MakeIntrusive<::NMonitoring::TDynamicCounters>()); + NConfig::TControlPlaneStorageConfig{}, NConfig::TComputeConfig{}, MakeIntrusive<::NMonitoring::TDynamicCounters>()); runtime->AddLocalService( NFq::ControlPlaneConfigActorId(), TActorSetupCmd(configService, TMailboxType::Simple, 0), diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index 36651db8f1..9661bdedf1 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -19,8 +19,9 @@ FederatedQuery::BindingSetting::BindingCase GetBindingType(const TString& typeSt } -TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common) +TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto) : Proto(FillDefaultParameters(config)) + , ComputeConfigProto(computeConfigProto) , IdsPrefix(common.GetIdsPrefix()) , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1))) diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index dd45eb6484..fc4fcf4893 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -15,6 +15,7 @@ namespace NFq { struct TControlPlaneStorageConfig { NConfig::TControlPlaneStorageConfig Proto; + NConfig::TComputeConfig ComputeConfigProto; TString IdsPrefix; TDuration IdempotencyKeyTtl; TDuration AutomaticQueriesTtl; @@ -30,7 +31,7 @@ struct TControlPlaneStorageConfig { TDuration QuotaTtl; TDuration MetricsTtl; - TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common); + TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const NConfig::TComputeConfig& computeConfigProto); }; } // NFq diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h index ef01e0deae..f04b0b2ed1 100644 --- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h +++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h @@ -46,6 +46,7 @@ NActors::IActor* CreateYdbControlPlaneStorageServiceActor( const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, + const NConfig::TComputeConfig& computeConfig, const ::NMonitoring::TDynamicCounterPtr& counters, const NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp index a778a751f8..fea916f03a 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp @@ -221,7 +221,7 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam } if (tenantInfo) { - auto tenant = tenantInfo->Assign(taskInternal.Task.Internal.cloud_id(), task.Scope, taskInternal.TenantName); + auto tenant = tenantInfo->Assign(taskInternal.Task.Internal.cloud_id(), task.Scope, taskInternal.Task.Query.content().type(), taskInternal.TenantName); if (tenant != taskInternal.TenantName) { // mapping changed, reassign tenant taskInternal.ShouldSkipTask = true; diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 2a9aa170e6..10e32f5302 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -634,11 +634,12 @@ NActors::IActor* CreateYdbControlPlaneStorageServiceActor( const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, + const NConfig::TComputeConfig& computeConfig, const ::NMonitoring::TDynamicCounterPtr& counters, const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TString& tenantName) { - return new TYdbControlPlaneStorageActor(config, s3Config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName); + return new TYdbControlPlaneStorageActor(config, s3Config, common, computeConfig, counters, yqSharedResources, credentialsProviderFactory, tenantName); } } // NFq diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index fb67be852b..6984be80f2 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -293,8 +293,9 @@ protected: TControlPlaneStorageUtils( const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, - const NConfig::TCommonConfig& common) - : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, s3Config, common)) + const NConfig::TCommonConfig& common, + const NConfig::TComputeConfig& computeConfig) + : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, s3Config, common, computeConfig)) { } @@ -594,11 +595,12 @@ public: const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, + const NConfig::TComputeConfig& computeConfig, const ::NMonitoring::TDynamicCounterPtr& counters, const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const TString& tenantName) - : TControlPlaneStorageUtils(config, s3Config, common) + : TControlPlaneStorageUtils(config, s3Config, common, computeConfig) , Counters(counters, *Config) , YqSharedResources(yqSharedResources) , CredProviderFactory(credProviderFactory) diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index fb59318076..7b5a824879 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -104,7 +104,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Streaming disposition \"from_last_checkpoint\" is not allowed in CreateQuery request")); } - auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName); + auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, request.content().type(), TenantName); if (event.Quotas) { TQuotaMap::const_iterator it = event.Quotas->end(); @@ -760,7 +760,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery issues.AddIssue(MakeErrorIssue(TIssuesIds::UNSUPPORTED, "State load mode \"FROM_LAST_CHECKPOINT\" is not supported")); } - auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, TenantName); + auto tenant = ev->Get()->TenantInfo->Assign(cloudId, scope, request.content().type(), TenantName); if (issues) { CPS_LOG_W("ModifyQueryRequest: {" << request.DebugString() << "} " << MakeUserInfo(user, token) << "validation FAILED: " << issues.ToOneLineString()); diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 4eca7d21bd..caad686453 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -84,6 +84,7 @@ void Init( protoConfig.GetControlPlaneStorage(), protoConfig.GetGateways().GetS3(), protoConfig.GetCommon(), + protoConfig.GetCompute(), yqCounters->GetSubgroup("subsystem", "ControlPlaneStorage"), yqSharedResources, NKikimr::CreateYdbCredentialsProviderFactory, @@ -92,7 +93,7 @@ void Init( actorRegistrator(NFq::ControlPlaneConfigActorId(), CreateControlPlaneConfigActor(yqSharedResources, NKikimr::CreateYdbCredentialsProviderFactory, protoConfig.GetControlPlaneStorage(), - yqCounters->GetSubgroup("subsystem", "ControlPlaneConfig")) + protoConfig.GetCompute(), yqCounters->GetSubgroup("subsystem", "ControlPlaneConfig")) ); } diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp index 8e8733461f..d2c2fb1ba9 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.cpp +++ b/ydb/core/fq/libs/test_connection/test_connection.cpp @@ -122,7 +122,7 @@ public: const NYql::IHTTPGateway::TPtr& httpGateway, const ::NMonitoring::TDynamicCounterPtr& counters) : Config(config) - , ControlPlaneStorageConfig(controlPlaneStorageConfig, s3Config, commonConfig) + , ControlPlaneStorageConfig(controlPlaneStorageConfig, s3Config, commonConfig, {}) , CommonConfig(commonConfig) , SharedResouces(sharedResources) , CredentialsFactory(credentialsFactory) |