diff options
author | hor911 <[email protected]> | 2023-03-19 14:35:26 +0300 |
---|---|---|
committer | hor911 <[email protected]> | 2023-03-19 14:35:26 +0300 |
commit | 47866ecfec6a044227776f817bcd230b5ab59328 (patch) | |
tree | fab9bc89770d038c5bd3c983f77747b4989b71db | |
parent | 3885d4050256290d9222a68f91e032f5c02c9a2a (diff) |
Use Block Arrow Parquet Reader in Scalar MKQL Mode
11 files changed, 29 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index fecebed55aa..635f11ad797 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -185,6 +185,10 @@ void Init( readActorFactoryCfg.FileSizeLimit = protoConfig.GetGateways().GetS3().GetFileSizeLimit(); } + if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) { + readActorFactoryCfg.BlockFileSizeLimit = + protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit(); + } RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index b4ce6859832..3bd0cbbe640 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -377,7 +377,8 @@ message TS3GatewayConfig { repeated TS3ClusterConfig ClusterMapping = 1; optional uint64 FileSizeLimit = 2; // Global limit - repeated TS3FormatSizeLimit FormatSizeLimit = 6; // Format limits + repeated TS3FormatSizeLimit FormatSizeLimit = 6; // Format limits (override FileSizeLimit) + optional uint64 BlockFileSizeLimit = 10; // Global limit for block readers (overrides FormatSizeLimit) optional uint64 MaxFilesPerQuery = 3; optional uint64 MaxReadSizePerQuery = 4; optional uint64 MaxDiscoveryFilesPerQuery = 5; diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index cbb44b135f8..5c1dee9c0c3 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -18,10 +18,9 @@ using namespace NYql::NNodes; class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: - TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) + TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) : TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {}) , Config(config) - , UseBlocks(useBlocks) { const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false); @@ -129,7 +128,7 @@ protected: inputType->Cast<TStructExprType>()->FindItem("_yql_block_length").Defined(); auto wideWrap = ctx.Builder(node.Pos()) - .Callable(UseBlocks && supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName()) + .Callable(supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName()) .Add(0, sourceArg) .Add(1, wrap.DataSource().Ptr()) .Add(2, wrap.RowType().Ptr()) @@ -142,8 +141,7 @@ protected: }) .Seal() .Build(); - - if (UseBlocks && supportsBlocks) { + if (supportsBlocks) { wideWrap = ctx.Builder(node.Pos()) .Callable("WideFromBlocks") .Add(0, wideWrap) @@ -383,11 +381,10 @@ protected: private: TDqConfiguration::TPtr Config; - const bool UseBlocks; }; -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) { - return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config, useBlocks)); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) { + return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config)); } } // NYql::NDqs diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h index ab20f334966..5ed9da98807 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.h +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h @@ -8,6 +8,6 @@ namespace NYql::NDqs { -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index 094c786fb53..9e99ccdceb1 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -31,7 +31,7 @@ public: TDqDataProviderSink(const TDqState::TPtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); }) - , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); }) + , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer( diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9871f32b9a6..03bdd97ed7c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1967,7 +1967,7 @@ private: try { if (ReadSpec->Arrow) { if (ReadSpec->Compression) { - Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression, use Pragma DisableUseBlocks")); + Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression")); fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; } else { try { @@ -2827,6 +2827,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount()); readSpec->RowGroupReordering = params.GetRowGroupReordering(); if (readSpec->Arrow) { + fileSizeLimit = cfg.BlockFileSizeLimit; arrow::SchemaBuilder builder; const TStringBuf blockLengthColumn("_yql_block_length"sv); auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, blockLengthColumn, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 00f5434248a..f70b13a96d8 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -18,6 +18,7 @@ struct TS3ReadActorFactoryConfig { ui64 MaxInflight = 20; ui64 DataInflight = 200_MB; ui64 FileSizeLimit = 2_GB; + ui64 BlockFileSizeLimit = 50_GB; std::unordered_map<TString, ui64> FormatSizeLimits; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 412f3e4d145..64c359caa4c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -182,7 +182,7 @@ public: auto format = s3ReadObject.Object().Format().Ref().Content(); if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") { bool supportedArrowTypes = false; - if (State_->Types->UseBlocks && State_->Types->ArrowResolver) { + if (State_->Configuration->UseBlocksSource.Get().GetOrElse(State_->Types->UseBlocks) && State_->Types->ArrowResolver) { TVector<const TTypeAnnotationNode*> allTypes; for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { allTypes.push_back(x->GetItemType()); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index af12022dea0..8c2b7c8b1b2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -12,6 +12,7 @@ #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/log/log.h> +#include <util/generic/size_literals.h> namespace NYql { @@ -231,6 +232,9 @@ public: fileSizeLimit = it->second; } } + if (formatName == "parquet" && State_->Configuration->UseBlocksSource.Get().GetOrElse(State_->Types->UseBlocks)) { + fileSizeLimit = State_->Configuration->BlockFileSizeLimit; + } for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) { TStringBuf packed = batch.Data().Literal().Value(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 580dddf2813..4a66190e238 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -18,6 +18,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, ArrowThreadPool); REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1); REGISTER_SETTING(*this, ArrowRowGroupReordering); + REGISTER_SETTING(*this, UseBlocksSource); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { @@ -36,6 +37,7 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA } } FileSizeLimit = config.HasFileSizeLimit() ? config.GetFileSizeLimit() : 2_GB; + BlockFileSizeLimit = config.HasBlockFileSizeLimit() ? config.GetBlockFileSizeLimit() : 50_GB; MaxFilesPerQuery = config.HasMaxFilesPerQuery() ? config.GetMaxFilesPerQuery() : 7000; MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery() ? config.GetMaxDiscoveryFilesPerQuery() : 9000; MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery() ? config.GetMaxDirectoriesAndFilesPerQuery() : 9000; @@ -51,6 +53,10 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA this->SetValidClusters(clusters); this->Dispatch(config.GetDefaultSettings()); + if (typeCtx->UseBlocks) { + YQL_ENSURE(UseBlocksSource.Get().GetOrElse(true), "Scalar Source is not compatible with Blocks engine"); + } + for (const auto& cluster: config.GetClusterMapping()) { this->Dispatch(cluster.GetName(), cluster.GetSettings()); auto& settings = Clusters[cluster.GetName()]; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index f7db3ee905d..ac247080745 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -20,6 +20,7 @@ struct TS3Settings { NCommon::TConfSetting<bool, false> ArrowThreadPool; NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1 NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK + NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode }; struct TS3ClusterSettings { @@ -41,6 +42,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher std::unordered_map<TString, TS3ClusterSettings> Clusters; ui64 FileSizeLimit; + ui64 BlockFileSizeLimit; std::unordered_map<TString, ui64> FormatSizeLimits; ui64 MaxFilesPerQuery; ui64 MaxDiscoveryFilesPerQuery; |