diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-04-10 12:00:58 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-04-10 12:00:58 +0300 |
commit | 189e2024b0bc3940365e535cea36610a6a0e3098 (patch) | |
tree | 69fb66028e98b98e5d51a4fb632303be8eb22796 | |
parent | c9e595c28aa6046c69f1b03c4b3583e3f0c642b2 (diff) | |
download | ydb-189e2024b0bc3940365e535cea36610a6a0e3098.tar.gz |
Added extra configuration for path generator
Added new configuration
16 files changed, 70 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp index 5d0c4222d4..36651db8f1 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.cpp +++ b/ydb/core/fq/libs/control_plane_storage/config.cpp @@ -19,7 +19,7 @@ FederatedQuery::BindingSetting::BindingCase GetBindingType(const TString& typeSt } -TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common) +TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common) : Proto(FillDefaultParameters(config)) , IdsPrefix(common.GetIdsPrefix()) , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) @@ -39,6 +39,9 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl AvailableBindings.insert(GetBindingType(availableBinding)); } + GeneratorPathsLimit = + s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000; + for (const auto& mapping : Proto.GetRetryPolicyMapping()) { auto& retryPolicy = mapping.GetPolicy(); auto retryCount = retryPolicy.GetRetryCount(); diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h index 1dc0c7f3b3..dd45eb6484 100644 --- a/ydb/core/fq/libs/control_plane_storage/config.h +++ b/ydb/core/fq/libs/control_plane_storage/config.h @@ -4,6 +4,7 @@ #include <ydb/core/fq/libs/config/protos/common.pb.h> #include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/public/api/protos/draft/fq.pb.h> #include <util/datetime/base.h> @@ -23,12 +24,13 @@ struct TControlPlaneStorageConfig { TDuration TaskLeaseTtl; TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableConnections; TSet<FederatedQuery::BindingSetting::BindingCase> AvailableBindings; + ui64 GeneratorPathsLimit; THashMap<ui64, TRetryPolicyItem> RetryPolicies; TRetryPolicyItem TaskLeaseRetryPolicy; TDuration QuotaTtl; TDuration MetricsTtl; - TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common); + TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common); }; } // NFq diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h index 264c0c1872..790886c1ae 100644 --- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h +++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h @@ -40,6 +40,7 @@ NActors::IActor* CreateInMemoryControlPlaneStorageServiceActor(const NConfig::TC NActors::IActor* CreateYdbControlPlaneStorageServiceActor( const NConfig::TControlPlaneStorageConfig& config, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const ::NMonitoring::TDynamicCounterPtr& counters, const NFq::TYqSharedResources::TPtr& yqSharedResources, diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp index a0b12382aa..82768fc6fb 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp @@ -315,8 +315,12 @@ NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, co return issues; } -NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy) { - auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs +NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) { + auto generator = NYql::NPathGenerator::CreatePathGenerator( + projection, + partitionedBy, + GetDataSlotColumns(schema), + pathsLimit); // an exception is thrown if an error occurs TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.h b/ydb/core/fq/libs/control_plane_storage/request_validators.h index 526d22662a..132a8a91b2 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.h @@ -81,10 +81,10 @@ NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobu NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false); NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy); -NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy); +NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit); template<typename T> -NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQuery::BindingSetting::BindingCase>& availableBindings) +NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQuery::BindingSetting::BindingCase>& availableBindings, size_t pathsLimit) { const auto& request = ev->Get()->Request; NYql::TIssues issues = ValidateEvent(ev, maxSize); @@ -138,7 +138,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ } projectionStr = projection.ToJsonPretty(); } - issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy)); + issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy, pathsLimit)); } catch (...) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage())); } diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp index b738fd6d07..b5e4636e13 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -619,12 +619,13 @@ TAsyncStatus TDbRequester::ReadModifyWrite( NActors::IActor* CreateYdbControlPlaneStorageServiceActor( const NConfig::TControlPlaneStorageConfig& config, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const ::NMonitoring::TDynamicCounterPtr& counters, const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TString& tenantName) { - return new TYdbControlPlaneStorageActor(config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName); + return new TYdbControlPlaneStorageActor(config, s3Config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName); } } // NFq diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index b11ab11502..f8f9ad6e46 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -283,8 +283,9 @@ class TControlPlaneStorageUtils { protected: TControlPlaneStorageUtils( const NConfig::TControlPlaneStorageConfig& config, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common) - : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, common)) + : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, s3Config, common)) { } @@ -309,7 +310,7 @@ protected: template<typename T> NYql::TIssues ValidateBinding(T& ev) { - return ::NFq::ValidateBinding<T>(ev, Config->Proto.GetMaxRequestSize(), Config->AvailableBindings); + return ::NFq::ValidateBinding<T>(ev, Config->Proto.GetMaxRequestSize(), Config->AvailableBindings, Config->GeneratorPathsLimit); } template<typename T> @@ -570,12 +571,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont public: TYdbControlPlaneStorageActor( const NConfig::TControlPlaneStorageConfig& config, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common, const ::NMonitoring::TDynamicCounterPtr& counters, const ::NFq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory, const TString& tenantName) - : TControlPlaneStorageUtils(config, common) + : TControlPlaneStorageUtils(config, s3Config, common) , Counters(counters, *Config) , YqSharedResources(yqSharedResources) , CredProviderFactory(credProviderFactory) diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 635f11ad79..0002c927c6 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -81,6 +81,7 @@ void Init( ? NFq::CreateInMemoryControlPlaneStorageServiceActor(protoConfig.GetControlPlaneStorage()) : NFq::CreateYdbControlPlaneStorageServiceActor( protoConfig.GetControlPlaneStorage(), + protoConfig.GetGateways().GetS3(), protoConfig.GetCommon(), yqCounters->GetSubgroup("subsystem", "ControlPlaneStorage"), yqSharedResources, @@ -275,6 +276,7 @@ void Init( auto testConnection = NFq::CreateTestConnectionActor( protoConfig.GetTestConnection(), protoConfig.GetControlPlaneStorage(), + protoConfig.GetGateways().GetS3(), protoConfig.GetCommon(), protoConfig.GetTokenAccessor(), yqSharedResources, diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp index 675c3fc0df..1f4a3bc90f 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.cpp +++ b/ydb/core/fq/libs/test_connection/test_connection.cpp @@ -111,6 +111,7 @@ public: TTestConnectionActor( const NConfig::TTestConnectionConfig& config, const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& commonConfig, const NConfig::TTokenAccessorConfig& tokenAccessorConfig, const NFq::TYqSharedResources::TPtr& sharedResources, @@ -120,7 +121,7 @@ public: const NYql::IHTTPGateway::TPtr& httpGateway, const ::NMonitoring::TDynamicCounterPtr& counters) : Config(config) - , ControlPlaneStorageConfig(controlPlaneStorageConfig, commonConfig) + , ControlPlaneStorageConfig(controlPlaneStorageConfig, s3Config, commonConfig) , CommonConfig(commonConfig) , SharedResouces(sharedResources) , CredentialsFactory(credentialsFactory) @@ -230,6 +231,7 @@ NActors::TActorId TestConnectionActorId() { NActors::IActor* CreateTestConnectionActor( const NConfig::TTestConnectionConfig& config, const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& commonConfig, const NConfig::TTokenAccessorConfig& tokenAccessorConfig, const NFq::TYqSharedResources::TPtr& sharedResources, @@ -238,10 +240,11 @@ NActors::IActor* CreateTestConnectionActor( const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const NYql::IHTTPGateway::TPtr& httpGateway, const ::NMonitoring::TDynamicCounterPtr& counters) { - return new TTestConnectionActor(config, controlPlaneStorageConfig, commonConfig, - tokenAccessorConfig, sharedResources, - credentialsFactory, cmConnections, - functionRegistry, httpGateway, counters); + return new TTestConnectionActor(config, controlPlaneStorageConfig, + s3Config, commonConfig, + tokenAccessorConfig, sharedResources, + credentialsFactory, cmConnections, + functionRegistry, httpGateway, counters); } } // namespace NFq diff --git a/ydb/core/fq/libs/test_connection/test_connection.h b/ydb/core/fq/libs/test_connection/test_connection.h index af9e4eb7d2..5b89f6e648 100644 --- a/ydb/core/fq/libs/test_connection/test_connection.h +++ b/ydb/core/fq/libs/test_connection/test_connection.h @@ -44,6 +44,7 @@ NActors::TActorId TestConnectionActorId(); NActors::IActor* CreateTestConnectionActor( const NConfig::TTestConnectionConfig& config, const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig, + const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& commonConfig, const NConfig::TTokenAccessorConfig& tokenAccessorConfig, const NFq::TYqSharedResources::TPtr& sharedResources, diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 9c40b776f3..0243818df4 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -389,6 +389,7 @@ message TS3GatewayConfig { optional uint64 ListingCallbackThreadCount = 12; optional uint64 ListingCallbackPerThreadQueueSize = 13; optional uint64 RegexpCacheSize = 14; + optional uint64 GeneratorPathsLimit = 15; repeated TAttr DefaultSettings = 100; } diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h index b462249515..089349cd90 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h @@ -72,6 +72,10 @@ struct IPathGenerator { using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>; -TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 1'000'000); +TPathGeneratorPtr CreatePathGenerator( + const TString& projection, + const std::vector<TString>& partitionedBy, + const TMap<TString, NUdf::EDataSlot>& columns = {}, + size_t pathsLimit = 50000); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 8ba263eb5e..89939e8f28 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -114,8 +114,12 @@ public: return true; } - bool ValidateProjection(const TString& projection, const std::vector<TString>& partitionedBy) { - auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy, DataSlotColumns); + bool ValidateProjection( + const TString& projection, + const std::vector<TString>& partitionedBy, + size_t pathsLimit) { + auto generator = NPathGenerator::CreatePathGenerator( + projection, partitionedBy, DataSlotColumns, pathsLimit); TMap<TString, NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; @@ -224,8 +228,13 @@ private: TMap<TString, NUdf::EDataSlot> DataSlotColumns; }; - -bool ValidateProjectionTypes(const TStructExprType* columnsType, const TString& projection, const std::vector<TString>& partitionedBy, const TExprNode::TPtr& input, TExprContext& ctx) { +bool ValidateProjectionTypes( + const TStructExprType* columnsType, + const TString& projection, + const std::vector<TString>& partitionedBy, + const TExprNode::TPtr& input, + TExprContext& ctx, + size_t pathsLimit) { if (!columnsType) { return true; } @@ -242,7 +251,7 @@ bool ValidateProjectionTypes(const TStructExprType* columnsType, const TString& } try { - if (!typeValidator.ValidateProjection(projection, partitionedBy)) { + if (!typeValidator.ValidateProjection(projection, partitionedBy, pathsLimit)) { return false; } } catch (...) { @@ -434,7 +443,13 @@ public: } } - if (!ValidateProjectionTypes(rowType->Cast<TStructExprType>(), projection, partitionedBy, input, ctx)) { + if (!ValidateProjectionTypes( + rowType->Cast<TStructExprType>(), + projection, + partitionedBy, + input, + ctx, + State_->Configuration->GeneratorPathsLimit)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 006ce7669b..9c0e6049fb 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -721,7 +721,11 @@ private: config.Columns = partitionedBy; config.SchemaTypeNode = schema->ChildPtr(1); if (!projection.empty()) { - config.Generator = CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(*schema, ctx)); + config.Generator = CreatePathGenerator( + projection, + partitionedBy, + GetDataSlotColumns(*schema, ctx), + State_->Configuration->GeneratorPathsLimit); if (!ValidateProjection(projectionPos, config.Generator, partitionedBy, ctx)) { return false; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 59342d9fb4..c1199d8565 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -64,6 +64,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA RegexpCacheSize = config.HasRegexpCacheSize() ? config.GetRegexpCacheSize() : 100; AllowConcurrentListings = config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false; + GeneratorPathsLimit = + config.HasGeneratorPathsLimit() ? config.GetGeneratorPathsLimit() : 50'000; TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index d129bdf16f..2744af335d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -57,6 +57,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher ui64 RegexpCacheSize; bool AllowLocalFiles; bool AllowConcurrentListings; + ui64 GeneratorPathsLimit; }; } // NYql |