aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-04-10 12:00:58 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-04-10 12:00:58 +0300
commit189e2024b0bc3940365e535cea36610a6a0e3098 (patch)
tree69fb66028e98b98e5d51a4fb632303be8eb22796
parentc9e595c28aa6046c69f1b03c4b3583e3f0c642b2 (diff)
downloadydb-189e2024b0bc3940365e535cea36610a6a0e3098.tar.gz
Added extra configuration for path generator
Added new configuration
-rw-r--r--ydb/core/fq/libs/control_plane_storage/config.cpp5
-rw-r--r--ydb/core/fq/libs/control_plane_storage/config.h4
-rw-r--r--ydb/core/fq/libs/control_plane_storage/control_plane_storage.h1
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.cpp8
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.h6
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp3
-rw-r--r--ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h8
-rw-r--r--ydb/core/fq/libs/init/init.cpp2
-rw-r--r--ydb/core/fq/libs/test_connection/test_connection.cpp13
-rw-r--r--ydb/core/fq/libs/test_connection/test_connection.h1
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto1
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp27
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
16 files changed, 70 insertions, 24 deletions
diff --git a/ydb/core/fq/libs/control_plane_storage/config.cpp b/ydb/core/fq/libs/control_plane_storage/config.cpp
index 5d0c4222d4..36651db8f1 100644
--- a/ydb/core/fq/libs/control_plane_storage/config.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/config.cpp
@@ -19,7 +19,7 @@ FederatedQuery::BindingSetting::BindingCase GetBindingType(const TString& typeSt
}
-TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common)
+TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common)
: Proto(FillDefaultParameters(config))
, IdsPrefix(common.GetIdsPrefix())
, IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
@@ -39,6 +39,9 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl
AvailableBindings.insert(GetBindingType(availableBinding));
}
+ GeneratorPathsLimit =
+ s3Config.HasGeneratorPathsLimit() ? s3Config.GetGeneratorPathsLimit() : 50'000;
+
for (const auto& mapping : Proto.GetRetryPolicyMapping()) {
auto& retryPolicy = mapping.GetPolicy();
auto retryCount = retryPolicy.GetRetryCount();
diff --git a/ydb/core/fq/libs/control_plane_storage/config.h b/ydb/core/fq/libs/control_plane_storage/config.h
index 1dc0c7f3b3..dd45eb6484 100644
--- a/ydb/core/fq/libs/control_plane_storage/config.h
+++ b/ydb/core/fq/libs/control_plane_storage/config.h
@@ -4,6 +4,7 @@
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/public/api/protos/draft/fq.pb.h>
#include <util/datetime/base.h>
@@ -23,12 +24,13 @@ struct TControlPlaneStorageConfig {
TDuration TaskLeaseTtl;
TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
TSet<FederatedQuery::BindingSetting::BindingCase> AvailableBindings;
+ ui64 GeneratorPathsLimit;
THashMap<ui64, TRetryPolicyItem> RetryPolicies;
TRetryPolicyItem TaskLeaseRetryPolicy;
TDuration QuotaTtl;
TDuration MetricsTtl;
- TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common);
+ TControlPlaneStorageConfig(const NConfig::TControlPlaneStorageConfig& config, const NYql::TS3GatewayConfig& s3Config, const NConfig::TCommonConfig& common);
};
} // NFq
diff --git a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
index 264c0c1872..790886c1ae 100644
--- a/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
+++ b/ydb/core/fq/libs/control_plane_storage/control_plane_storage.h
@@ -40,6 +40,7 @@ NActors::IActor* CreateInMemoryControlPlaneStorageServiceActor(const NConfig::TC
NActors::IActor* CreateYdbControlPlaneStorageServiceActor(
const NConfig::TControlPlaneStorageConfig& config,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& common,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NFq::TYqSharedResources::TPtr& yqSharedResources,
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
index a0b12382aa..82768fc6fb 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
@@ -315,8 +315,12 @@ NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, co
return issues;
}
-NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy) {
- auto generator =NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs
+NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) {
+ auto generator = NYql::NPathGenerator::CreatePathGenerator(
+ projection,
+ partitionedBy,
+ GetDataSlotColumns(schema),
+ pathsLimit); // an exception is thrown if an error occurs
TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.h b/ydb/core/fq/libs/control_plane_storage/request_validators.h
index 526d22662a..132a8a91b2 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.h
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.h
@@ -81,10 +81,10 @@ NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobu
NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);
NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy);
-NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy);
+NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit);
template<typename T>
-NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQuery::BindingSetting::BindingCase>& availableBindings)
+NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQuery::BindingSetting::BindingCase>& availableBindings, size_t pathsLimit)
{
const auto& request = ev->Get()->Request;
NYql::TIssues issues = ValidateEvent(ev, maxSize);
@@ -138,7 +138,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
}
projectionStr = projection.ToJsonPretty();
}
- issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy));
+ issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy, pathsLimit));
} catch (...) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage()));
}
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index b738fd6d07..b5e4636e13 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -619,12 +619,13 @@ TAsyncStatus TDbRequester::ReadModifyWrite(
NActors::IActor* CreateYdbControlPlaneStorageServiceActor(
const NConfig::TControlPlaneStorageConfig& config,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& common,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NFq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TString& tenantName) {
- return new TYdbControlPlaneStorageActor(config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName);
+ return new TYdbControlPlaneStorageActor(config, s3Config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName);
}
} // NFq
diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index b11ab11502..f8f9ad6e46 100644
--- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -283,8 +283,9 @@ class TControlPlaneStorageUtils {
protected:
TControlPlaneStorageUtils(
const NConfig::TControlPlaneStorageConfig& config,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& common)
- : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, common))
+ : Config(std::make_shared<::NFq::TControlPlaneStorageConfig>(config, s3Config, common))
{
}
@@ -309,7 +310,7 @@ protected:
template<typename T>
NYql::TIssues ValidateBinding(T& ev)
{
- return ::NFq::ValidateBinding<T>(ev, Config->Proto.GetMaxRequestSize(), Config->AvailableBindings);
+ return ::NFq::ValidateBinding<T>(ev, Config->Proto.GetMaxRequestSize(), Config->AvailableBindings, Config->GeneratorPathsLimit);
}
template<typename T>
@@ -570,12 +571,13 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
public:
TYdbControlPlaneStorageActor(
const NConfig::TControlPlaneStorageConfig& config,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& common,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NFq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
const TString& tenantName)
- : TControlPlaneStorageUtils(config, common)
+ : TControlPlaneStorageUtils(config, s3Config, common)
, Counters(counters, *Config)
, YqSharedResources(yqSharedResources)
, CredProviderFactory(credProviderFactory)
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 635f11ad79..0002c927c6 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -81,6 +81,7 @@ void Init(
? NFq::CreateInMemoryControlPlaneStorageServiceActor(protoConfig.GetControlPlaneStorage())
: NFq::CreateYdbControlPlaneStorageServiceActor(
protoConfig.GetControlPlaneStorage(),
+ protoConfig.GetGateways().GetS3(),
protoConfig.GetCommon(),
yqCounters->GetSubgroup("subsystem", "ControlPlaneStorage"),
yqSharedResources,
@@ -275,6 +276,7 @@ void Init(
auto testConnection = NFq::CreateTestConnectionActor(
protoConfig.GetTestConnection(),
protoConfig.GetControlPlaneStorage(),
+ protoConfig.GetGateways().GetS3(),
protoConfig.GetCommon(),
protoConfig.GetTokenAccessor(),
yqSharedResources,
diff --git a/ydb/core/fq/libs/test_connection/test_connection.cpp b/ydb/core/fq/libs/test_connection/test_connection.cpp
index 675c3fc0df..1f4a3bc90f 100644
--- a/ydb/core/fq/libs/test_connection/test_connection.cpp
+++ b/ydb/core/fq/libs/test_connection/test_connection.cpp
@@ -111,6 +111,7 @@ public:
TTestConnectionActor(
const NConfig::TTestConnectionConfig& config,
const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& commonConfig,
const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
const NFq::TYqSharedResources::TPtr& sharedResources,
@@ -120,7 +121,7 @@ public:
const NYql::IHTTPGateway::TPtr& httpGateway,
const ::NMonitoring::TDynamicCounterPtr& counters)
: Config(config)
- , ControlPlaneStorageConfig(controlPlaneStorageConfig, commonConfig)
+ , ControlPlaneStorageConfig(controlPlaneStorageConfig, s3Config, commonConfig)
, CommonConfig(commonConfig)
, SharedResouces(sharedResources)
, CredentialsFactory(credentialsFactory)
@@ -230,6 +231,7 @@ NActors::TActorId TestConnectionActorId() {
NActors::IActor* CreateTestConnectionActor(
const NConfig::TTestConnectionConfig& config,
const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& commonConfig,
const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
const NFq::TYqSharedResources::TPtr& sharedResources,
@@ -238,10 +240,11 @@ NActors::IActor* CreateTestConnectionActor(
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const NYql::IHTTPGateway::TPtr& httpGateway,
const ::NMonitoring::TDynamicCounterPtr& counters) {
- return new TTestConnectionActor(config, controlPlaneStorageConfig, commonConfig,
- tokenAccessorConfig, sharedResources,
- credentialsFactory, cmConnections,
- functionRegistry, httpGateway, counters);
+ return new TTestConnectionActor(config, controlPlaneStorageConfig,
+ s3Config, commonConfig,
+ tokenAccessorConfig, sharedResources,
+ credentialsFactory, cmConnections,
+ functionRegistry, httpGateway, counters);
}
} // namespace NFq
diff --git a/ydb/core/fq/libs/test_connection/test_connection.h b/ydb/core/fq/libs/test_connection/test_connection.h
index af9e4eb7d2..5b89f6e648 100644
--- a/ydb/core/fq/libs/test_connection/test_connection.h
+++ b/ydb/core/fq/libs/test_connection/test_connection.h
@@ -44,6 +44,7 @@ NActors::TActorId TestConnectionActorId();
NActors::IActor* CreateTestConnectionActor(
const NConfig::TTestConnectionConfig& config,
const NConfig::TControlPlaneStorageConfig& controlPlaneStorageConfig,
+ const NYql::TS3GatewayConfig& s3Config,
const NConfig::TCommonConfig& commonConfig,
const NConfig::TTokenAccessorConfig& tokenAccessorConfig,
const NFq::TYqSharedResources::TPtr& sharedResources,
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 9c40b776f3..0243818df4 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -389,6 +389,7 @@ message TS3GatewayConfig {
optional uint64 ListingCallbackThreadCount = 12;
optional uint64 ListingCallbackPerThreadQueueSize = 13;
optional uint64 RegexpCacheSize = 14;
+ optional uint64 GeneratorPathsLimit = 15;
repeated TAttr DefaultSettings = 100;
}
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
index b462249515..089349cd90 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
@@ -72,6 +72,10 @@ struct IPathGenerator {
using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>;
-TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 1'000'000);
+TPathGeneratorPtr CreatePathGenerator(
+ const TString& projection,
+ const std::vector<TString>& partitionedBy,
+ const TMap<TString, NUdf::EDataSlot>& columns = {},
+ size_t pathsLimit = 50000);
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 8ba263eb5e..89939e8f28 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -114,8 +114,12 @@ public:
return true;
}
- bool ValidateProjection(const TString& projection, const std::vector<TString>& partitionedBy) {
- auto generator = NPathGenerator::CreatePathGenerator(projection, partitionedBy, DataSlotColumns);
+ bool ValidateProjection(
+ const TString& projection,
+ const std::vector<TString>& partitionedBy,
+ size_t pathsLimit) {
+ auto generator = NPathGenerator::CreatePathGenerator(
+ projection, partitionedBy, DataSlotColumns, pathsLimit);
TMap<TString, NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
@@ -224,8 +228,13 @@ private:
TMap<TString, NUdf::EDataSlot> DataSlotColumns;
};
-
-bool ValidateProjectionTypes(const TStructExprType* columnsType, const TString& projection, const std::vector<TString>& partitionedBy, const TExprNode::TPtr& input, TExprContext& ctx) {
+bool ValidateProjectionTypes(
+ const TStructExprType* columnsType,
+ const TString& projection,
+ const std::vector<TString>& partitionedBy,
+ const TExprNode::TPtr& input,
+ TExprContext& ctx,
+ size_t pathsLimit) {
if (!columnsType) {
return true;
}
@@ -242,7 +251,7 @@ bool ValidateProjectionTypes(const TStructExprType* columnsType, const TString&
}
try {
- if (!typeValidator.ValidateProjection(projection, partitionedBy)) {
+ if (!typeValidator.ValidateProjection(projection, partitionedBy, pathsLimit)) {
return false;
}
} catch (...) {
@@ -434,7 +443,13 @@ public:
}
}
- if (!ValidateProjectionTypes(rowType->Cast<TStructExprType>(), projection, partitionedBy, input, ctx)) {
+ if (!ValidateProjectionTypes(
+ rowType->Cast<TStructExprType>(),
+ projection,
+ partitionedBy,
+ input,
+ ctx,
+ State_->Configuration->GeneratorPathsLimit)) {
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 006ce7669b..9c0e6049fb 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -721,7 +721,11 @@ private:
config.Columns = partitionedBy;
config.SchemaTypeNode = schema->ChildPtr(1);
if (!projection.empty()) {
- config.Generator = CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(*schema, ctx));
+ config.Generator = CreatePathGenerator(
+ projection,
+ partitionedBy,
+ GetDataSlotColumns(*schema, ctx),
+ State_->Configuration->GeneratorPathsLimit);
if (!ValidateProjection(projectionPos, config.Generator, partitionedBy, ctx)) {
return false;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 59342d9fb4..c1199d8565 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -64,6 +64,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
RegexpCacheSize = config.HasRegexpCacheSize() ? config.GetRegexpCacheSize() : 100;
AllowConcurrentListings =
config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false;
+ GeneratorPathsLimit =
+ config.HasGeneratorPathsLimit() ? config.GetGeneratorPathsLimit() : 50'000;
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index d129bdf16f..2744af335d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -57,6 +57,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
ui64 RegexpCacheSize;
bool AllowLocalFiles;
bool AllowConcurrentListings;
+ ui64 GeneratorPathsLimit;
};
} // NYql