summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-03-19 14:35:26 +0300
committerhor911 <[email protected]>2023-03-19 14:35:26 +0300
commit47866ecfec6a044227776f817bcd230b5ab59328 (patch)
treefab9bc89770d038c5bd3c983f77747b4989b71db
parent3885d4050256290d9222a68f91e032f5c02c9a2a (diff)
Use Block Arrow Parquet Reader in Scalar MKQL Mode
-rw-r--r--ydb/core/fq/libs/init/init.cpp4
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto3
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp13
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.h2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h2
11 files changed, 29 insertions, 13 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index fecebed55aa..635f11ad797 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -185,6 +185,10 @@ void Init(
readActorFactoryCfg.FileSizeLimit =
protoConfig.GetGateways().GetS3().GetFileSizeLimit();
}
+ if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) {
+ readActorFactoryCfg.BlockFileSizeLimit =
+ protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit();
+ }
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index b4ce6859832..3bd0cbbe640 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -377,7 +377,8 @@ message TS3GatewayConfig {
repeated TS3ClusterConfig ClusterMapping = 1;
optional uint64 FileSizeLimit = 2; // Global limit
- repeated TS3FormatSizeLimit FormatSizeLimit = 6; // Format limits
+ repeated TS3FormatSizeLimit FormatSizeLimit = 6; // Format limits (override FileSizeLimit)
+ optional uint64 BlockFileSizeLimit = 10; // Global limit for block readers (overrides FormatSizeLimit)
optional uint64 MaxFilesPerQuery = 3;
optional uint64 MaxReadSizePerQuery = 4;
optional uint64 MaxDiscoveryFilesPerQuery = 5;
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index cbb44b135f8..5c1dee9c0c3 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -18,10 +18,9 @@ using namespace NYql::NNodes;
class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
- TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks)
+ TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config)
: TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {})
, Config(config)
- , UseBlocks(useBlocks)
{
const bool enablePrecompute = Config->_EnablePrecompute.Get().GetOrElse(false);
@@ -129,7 +128,7 @@ protected:
inputType->Cast<TStructExprType>()->FindItem("_yql_block_length").Defined();
auto wideWrap = ctx.Builder(node.Pos())
- .Callable(UseBlocks && supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName())
+ .Callable(supportsBlocks ? TDqSourceWideBlockWrap::CallableName() : TDqSourceWideWrap::CallableName())
.Add(0, sourceArg)
.Add(1, wrap.DataSource().Ptr())
.Add(2, wrap.RowType().Ptr())
@@ -142,8 +141,7 @@ protected:
})
.Seal()
.Build();
-
- if (UseBlocks && supportsBlocks) {
+ if (supportsBlocks) {
wideWrap = ctx.Builder(node.Pos())
.Callable("WideFromBlocks")
.Add(0, wideWrap)
@@ -383,11 +381,10 @@ protected:
private:
TDqConfiguration::TPtr Config;
- const bool UseBlocks;
};
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks) {
- return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config, useBlocks));
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) {
+ return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config));
}
} // NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h
index ab20f334966..5ed9da98807 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.h
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h
@@ -8,6 +8,6 @@
namespace NYql::NDqs {
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config, bool useBlocks);
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index 094c786fb53..9e99ccdceb1 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -31,7 +31,7 @@ public:
TDqDataProviderSink(const TDqState::TPtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(state->TypeCtx, state->Settings); })
- , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings, state->TypeCtx->UseBlocks ); })
+ , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO*/nullptr, state->Settings); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
, TypeAnnotationTransformer([state] () {
return CreateDqsDataSinkTypeAnnotationTransformer(
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 9871f32b9a6..03bdd97ed7c 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1967,7 +1967,7 @@ private:
try {
if (ReadSpec->Arrow) {
if (ReadSpec->Compression) {
- Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression, use Pragma DisableUseBlocks"));
+ Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression"));
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
} else {
try {
@@ -2827,6 +2827,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
readSpec->ParallelRowGroupCount = std::max(1ul, params.GetParallelRowGroupCount());
readSpec->RowGroupReordering = params.GetRowGroupReordering();
if (readSpec->Arrow) {
+ fileSizeLimit = cfg.BlockFileSizeLimit;
arrow::SchemaBuilder builder;
const TStringBuf blockLengthColumn("_yql_block_length"sv);
auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, blockLengthColumn,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 00f5434248a..f70b13a96d8 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -18,6 +18,7 @@ struct TS3ReadActorFactoryConfig {
ui64 MaxInflight = 20;
ui64 DataInflight = 200_MB;
ui64 FileSizeLimit = 2_GB;
+ ui64 BlockFileSizeLimit = 50_GB;
std::unordered_map<TString, ui64> FormatSizeLimits;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 412f3e4d145..64c359caa4c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -182,7 +182,7 @@ public:
auto format = s3ReadObject.Object().Format().Ref().Content();
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
bool supportedArrowTypes = false;
- if (State_->Types->UseBlocks && State_->Types->ArrowResolver) {
+ if (State_->Configuration->UseBlocksSource.Get().GetOrElse(State_->Types->UseBlocks) && State_->Types->ArrowResolver) {
TVector<const TTypeAnnotationNode*> allTypes;
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
allTypes.push_back(x->GetItemType());
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index af12022dea0..8c2b7c8b1b2 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -12,6 +12,7 @@
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <util/generic/size_literals.h>
namespace NYql {
@@ -231,6 +232,9 @@ public:
fileSizeLimit = it->second;
}
}
+ if (formatName == "parquet" && State_->Configuration->UseBlocksSource.Get().GetOrElse(State_->Types->UseBlocks)) {
+ fileSizeLimit = State_->Configuration->BlockFileSizeLimit;
+ }
for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
TStringBuf packed = batch.Data().Literal().Value();
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 580dddf2813..4a66190e238 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -18,6 +18,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, ArrowThreadPool);
REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1);
REGISTER_SETTING(*this, ArrowRowGroupReordering);
+ REGISTER_SETTING(*this, UseBlocksSource);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
@@ -36,6 +37,7 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
}
}
FileSizeLimit = config.HasFileSizeLimit() ? config.GetFileSizeLimit() : 2_GB;
+ BlockFileSizeLimit = config.HasBlockFileSizeLimit() ? config.GetBlockFileSizeLimit() : 50_GB;
MaxFilesPerQuery = config.HasMaxFilesPerQuery() ? config.GetMaxFilesPerQuery() : 7000;
MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery() ? config.GetMaxDiscoveryFilesPerQuery() : 9000;
MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery() ? config.GetMaxDirectoriesAndFilesPerQuery() : 9000;
@@ -51,6 +53,10 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
this->SetValidClusters(clusters);
this->Dispatch(config.GetDefaultSettings());
+ if (typeCtx->UseBlocks) {
+ YQL_ENSURE(UseBlocksSource.Get().GetOrElse(true), "Scalar Source is not compatible with Blocks engine");
+ }
+
for (const auto& cluster: config.GetClusterMapping()) {
this->Dispatch(cluster.GetName(), cluster.GetSettings());
auto& settings = Clusters[cluster.GetName()];
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index f7db3ee905d..ac247080745 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -20,6 +20,7 @@ struct TS3Settings {
NCommon::TConfSetting<bool, false> ArrowThreadPool;
NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1
NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
+ NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode
};
struct TS3ClusterSettings {
@@ -41,6 +42,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
std::unordered_map<TString, TS3ClusterSettings> Clusters;
ui64 FileSizeLimit;
+ ui64 BlockFileSizeLimit;
std::unordered_map<TString, ui64> FormatSizeLimits;
ui64 MaxFilesPerQuery;
ui64 MaxDiscoveryFilesPerQuery;