diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-20 15:39:27 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-20 15:39:27 +0300 |
commit | 9f02f8dc82cbb72bff77b003b2ca8d79c52792a3 (patch) | |
tree | 97df8c549df0ffeec7b3dedf621df142774bd774 | |
parent | aac6932b47498fd2f8304a434d80e368484dece6 (diff) | |
download | ydb-9f02f8dc82cbb72bff77b003b2ca8d79c52792a3.tar.gz |
Prepare to enable extract predicate
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/compile_service/kqp_compile_service.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_kql.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 6 | ||||
-rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.cpp | 20 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 19 |
15 files changed, 78 insertions, 55 deletions
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index c66f0817b5f..605f64fdb7c 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -352,6 +352,8 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup(); kqpConfig.EnableKqpDataQueryStreamPointLookup = serviceConfig.GetEnableKqpDataQueryStreamPointLookup(); kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin(); + kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries(); + kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries(); } IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 711b9e927bc..0949fc95a1a 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -368,6 +368,9 @@ private: bool enableKqpDataQuerySourceRead = Config.GetEnableKqpDataQuerySourceRead(); bool enableKqpScanQuerySourceRead = Config.GetEnableKqpScanQuerySourceRead(); + bool enableKqpDataQueryPredicateExtract = Config.GetEnablePredicateExtractForDataQueries(); + bool enableKqpScanQueryPredicateExtract = Config.GetEnablePredicateExtractForScanQueries(); + Config.Swap(event.MutableConfig()->MutableTableServiceConfig()); LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config"); @@ -378,7 +381,9 @@ private: Config.GetEnableKqpScanQueryStreamLookup() != enableKqpScanQueryStreamLookup || Config.GetEnableKqpScanQueryStreamIdxLookupJoin() != enableKqpScanQueryStreamIdxLookupJoin || Config.GetEnableKqpDataQuerySourceRead() != enableKqpDataQuerySourceRead || - Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead) { + Config.GetEnableKqpScanQuerySourceRead() != enableKqpScanQuerySourceRead || + Config.GetEnablePredicateExtractForDataQueries() != enableKqpDataQueryPredicateExtract || + Config.GetEnablePredicateExtractForScanQueries() != enableKqpScanQueryPredicateExtract) { LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Iterator read flags was changed. StreamLookup from " << enableKqpDataQueryStreamLookup << diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 957255b6710..1e03a8de445 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -88,11 +88,11 @@ bool UseReadTableRanges(const TKikimrTableDescription& tableData, const TIntrusi return predicateExtractSetting == EOptionalFlag::Enabled; } - if (kqpCtx->IsScanQuery() && kqpCtx->Config->FeatureFlags.GetEnablePredicateExtractForScanQueries()) { + if (kqpCtx->IsScanQuery() && kqpCtx->Config->EnablePredicateExtractForScanQuery) { return true; } - if (kqpCtx->IsDataQuery() && kqpCtx->Config->FeatureFlags.GetEnablePredicateExtractForDataQueries()) { + if (kqpCtx->IsDataQuery() && kqpCtx->Config->EnablePredicateExtractForDataQuery) { return true; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 37f9c2e9e39..107cc3d410b 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -312,6 +312,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Done(); } + TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TParentsMap& parents) { @@ -357,7 +358,12 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, .BuildNode(ctx, read.Pos())) .Done(); } else { - auto connections = FindDqConnections(node); + TVector<TDqConnection> connections; + bool isPure; + FindDqConnections(node, connections, isPure); + if (!isPure) { + return node; + } YQL_ENSURE(!connections.empty()); TVector<TDqConnection> inputs; TVector<TExprBase> stageInputs; @@ -370,8 +376,8 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, return node; } - if (!IsSingleConsumerConnection(input, parents, false)) { - continue; + if (!IsSingleConsumerConnection(input, parents, true)) { + return node; } inputs.push_back(input); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index 333f78d3f0f..a32ef61a338 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -176,7 +176,7 @@ TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqp auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input(); bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); - bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid(); + bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ; if (!isReadTable && !isReadTableRanges) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index a5718063ea2..4b09bf06556 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -169,7 +169,7 @@ TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TK return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx, [&](TExprBase input) -> TMaybe<TTableData> { bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); - bool isReadTableRanges = input.Maybe<TKqlReadTableRangesBase>().IsValid(); + bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ; if (!isReadTable && !isReadTableRanges) { return Nothing(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index f10ddec1a8d..08127e4c3d8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -135,6 +135,8 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpDataQueryStreamLookup = false; bool EnableKqpDataQueryStreamPointLookup = false; bool EnableKqpScanQueryStreamIdxLookupJoin = false; + bool EnablePredicateExtractForScanQuery = true; + bool EnablePredicateExtractForDataQuery = false; }; } diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index e4f54b8ba98..81acae2b567 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3273,8 +3273,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(PushFlatmapInnerConnectionsToStageInput) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); auto settings = TKikimrSettings() - .SetEnablePredicateExtractForDataQueries(false); + .SetAppConfig(app); TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3376,8 +3378,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(MultiUsageInnerConnection) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); auto settings = TKikimrSettings() - .SetEnablePredicateExtractForDataQueries(false); + .SetAppConfig(app); TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); @@ -3480,8 +3484,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); auto settings = TKikimrSettings() - .SetEnablePredicateExtractForDataQueries(false); + .SetAppConfig(app); TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3526,8 +3532,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { } Y_UNIT_TEST(FlatMapLambdaInnerPrecompute) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); auto settings = TKikimrSettings() - .SetEnablePredicateExtractForDataQueries(false); + .SetAppConfig(app); TKikimrRunner kikimr{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3549,10 +3557,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -3582,9 +3588,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -3605,9 +3609,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -3638,10 +3640,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -3668,10 +3668,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index c5b85b3cab6..f2797e3df87 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -1137,7 +1137,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_UNIT_TEST_TWIN(PrunePartitionsByLiteral, WithPredicatesExtract) { auto cfg = AppCfg(); - cfg.MutableFeatureFlags()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract); + cfg.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(WithPredicatesExtract); auto kikimr = DefaultKikimrRunner({}, cfg); auto db = kikimr.GetTableClient(); @@ -1979,10 +1979,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -2001,10 +1999,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); @@ -2024,10 +2020,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { TKikimrSettings settings; NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForDataQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForDataQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); TKikimrRunner kikimr(settings); diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 302090aa6c5..bb977755f08 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -380,10 +380,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnablePredicateExtractForScanQueries(true); settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); settings.SetAppConfig(appConfig); Kikimr.ConstructInPlace(settings); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 96bc141bd9c..0997fb53d81 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -762,13 +762,13 @@ message TFeatureFlags { optional bool EnablePublicApiExternalBlobs = 59 [default = false]; optional bool EnablePublicApiKeepInMemory = 60 [default = false]; optional bool EnableImplicitScanQueryInScripts = 61 [default = true]; - optional bool EnablePredicateExtractForScanQueries = 62 [default = true]; + reserved 62; // EnablePredicateExtractForScanQueries optional bool AllowVDiskDefrag = 63 [default = true]; optional bool EnableAsyncHttpMon = 64 [default = true]; optional bool EnableChangefeeds = 65 [default = true]; reserved 66; // EnableKqpScanQueryStreamLookup optional bool EnableKqpScanQueryMultipleOlapShardsReads = 67 [default = false]; - optional bool EnablePredicateExtractForDataQueries = 68 [default = true]; + reserved 68; // EnablePredicateExtractForDataQueries; reserved 69; // optional bool EnableKqpPatternCacheLiteral = 69 [default = false]; optional bool EnableMoveIndex = 70 [default = true]; // enable http handle for self termination @@ -1270,6 +1270,8 @@ message TTableServiceConfig { optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; optional bool EnablePublishKqpProxyByRM = 34 [default = false]; optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false]; + optional bool EnablePredicateExtractForScanQueries = 36 [default = true]; + optional bool EnablePredicateExtractForDataQueries = 37 [default = true]; }; // Config describes immediate controls and allows diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 4f2c8993cbf..a4b36faf0da 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -34,7 +34,6 @@ public: FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables) FEATURE_FLAG_SETTER(EnableChangefeeds) FEATURE_FLAG_SETTER(EnableMoveIndex) - FEATURE_FLAG_SETTER(EnablePredicateExtractForDataQueries) FEATURE_FLAG_SETTER(EnableNotNullDataColumns) FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard) FEATURE_FLAG_SETTER(EnableGrpcAudit) diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index bd8bfd6abe9..f27f3949f5a 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -133,10 +133,15 @@ ui32 GetStageOutputsCount(const TDqStageBase& stage) { return resultsTypeTuple->GetSize(); } -TVector<TDqConnection> FindDqConnections(const TExprBase& node) { - TVector<TDqConnection> connections; +bool IsDqPureNode(const TExprBase& node) { + return !node.Maybe<TDqSource>() && + !node.Maybe<TDqConnection>() && + !node.Maybe<TDqPrecompute>(); +} - VisitExpr(node.Ptr(), [&connections](const TExprNode::TPtr& exprNode) { +void FindDqConnections(const TExprBase& node, TVector<TDqConnection>& connections, bool& isPure) { + isPure = true; + VisitExpr(node.Ptr(), [&](const TExprNode::TPtr& exprNode) { TExprBase node(exprNode); if (node.Maybe<TDqPhyPrecompute>()) { @@ -149,10 +154,12 @@ TVector<TDqConnection> FindDqConnections(const TExprBase& node) { return false; } + if (!IsDqPureNode(node)) { + isPure = false; + } + return true; }); - - return connections; } bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) { @@ -161,8 +168,7 @@ bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) { }; auto predicate = [](const TExprNode::TPtr& node) { - return TMaybeNode<TDqSource>(node).IsValid() || - TMaybeNode<TDqConnection>(node).IsValid(); + return !IsDqPureNode(TExprBase(node)); }; if (isPrecomputePure) { diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index a659a7ddab0..c49a97266ea 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -47,7 +47,7 @@ bool IsSingleConsumerConnection(const NNodes::TDqConnection& node, const TParent ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage); -TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node); +void FindDqConnections(const NNodes::TExprBase& node, TVector<NNodes::TDqConnection>&, bool&); bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true); bool IsDqSelfContainedExpr(const NNodes::TExprBase& node); bool IsDqDependsOnStage(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index c3f43485571..7a37df04858 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -734,8 +734,10 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { return node; } - auto innerConnections = FindDqConnections(flatmap.Lambda()); - if (innerConnections.empty()) { + bool isPure; + TVector<TDqConnection> innerConnections; + FindDqConnections(flatmap.Lambda(), innerConnections, isPure); + if (!isPure || innerConnections.empty()) { return node; } @@ -776,7 +778,12 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo return node; } - auto innerConnections = FindDqConnections(flatmap.Lambda()); + bool isPure; + TVector<TDqConnection> innerConnections; + FindDqConnections(flatmap.Lambda(), innerConnections, isPure); + if (!isPure) { + return node; + } TMaybeNode<TDqStage> flatmapStage; if (!innerConnections.empty()) { @@ -2634,15 +2641,19 @@ TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) { TExprNode::TListType newArgs; TNodeOnNodeOwnedMap replaces; + TNodeOnNodeOwnedMap inputsReplaces; + for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { newInputs.push_back(stage.Inputs().Item(i).Ptr()); auto arg = stage.Program().Args().Arg(i).Raw(); newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content())); replaces[arg] = newArgs.back(); + + inputsReplaces[arg] = stage.Inputs().Item(i).Ptr(); } for (auto& precompute: innerPrecomputes) { - newInputs.push_back(precompute); + newInputs.push_back(ctx.ReplaceNodes(TExprNode::TPtr(precompute), inputsReplaces)); newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size())); replaces[precompute.Get()] = newArgs.back(); } |