diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-11-17 13:53:12 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-11-17 13:53:12 +0300 |
commit | 8bc98e5d518b5b5cda9f91f28505a3fe746cb45f (patch) | |
tree | 4df63d295323f190f56e2ea4fd526946dfe8b969 | |
parent | 97370e00856dc4e1e46ae93ce827cac6dc235bd7 (diff) | |
download | ydb-8bc98e5d518b5b5cda9f91f28505a3fe746cb45f.tar.gz |
push FlatMap inner precomputes before BuildFlatMapStage
fix(kqp_opt): push precomputs to stage inputs
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 20 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 48 |
2 files changed, 49 insertions, 19 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 7af04af709..ba0b6b28fa 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3405,6 +3405,26 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(FlatMapLambdaInnerPrecompute) { + auto settings = TKikimrSettings() + .SetEnablePredicateExtractForDataQueries(false); + TKikimrRunner kikimr{settings}; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + $rows = SELECT * FROM KeyValue; + $cnt = SELECT count(*) FROM $rows; + $join = SELECT l.Value AS value FROM $rows as l LEFT JOIN EightShard AS r on l.Key = r.Key; + + $check = SELECT count(*) FROM $join; + SELECT * FROM EightShard WHERE $check = $cnt; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 884744bd11..f650f9b34f 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -302,9 +302,32 @@ TExprBase DqPushMembersFilterToStage(TExprBase node, TExprContext& ctx, IOptimiz return result.Cast(); } +TExprNode::TListType FindLambdaPrecomputes(const TCoLambda& lambda) { + auto predicate = [](const TExprNode::TPtr& node) { + return TMaybeNode<TDqPrecompute>(node).IsValid(); + }; + + TExprNode::TListType result; + VisitExpr(lambda.Body().Ptr(), [&result, &predicate] (const TExprNode::TPtr& node) { + if (predicate(node)) { + result.emplace_back(node); + return false; + } + + return true; + }); + + return result; +} + TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& flatmap, - TExprNode::TListType&& innerConnections, const TParentsMap& parentsMap, TExprContext& ctx) + TVector<NNodes::TDqConnection>&& innerConnections, const TParentsMap& parentsMap, TExprContext& ctx) { + auto innerPrecomputes = FindLambdaPrecomputes(flatmap.Lambda()); + if (!innerPrecomputes.empty()) { + return {}; + } + TVector<TDqConnection> inputs; TNodeOnNodeOwnedMap replaceMap; @@ -312,15 +335,15 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f inputs.reserve(innerConnections.size() + 1); inputs.push_back(flatmap.Input().Cast<TDqConnection>()); for (auto& cn : innerConnections) { - if (!TMaybeNode<TDqCnUnionAll>(cn).IsValid() && !TMaybeNode<TDqCnMerge>(cn).IsValid()) { + if (!cn.Maybe<TDqCnUnionAll>() && !cn.Maybe<TDqCnMerge>()) { return {}; } - if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, true)) { + if (!IsSingleConsumerConnection(cn, parentsMap, true)) { return {}; } - inputs.push_back(TDqConnection(cn)); + inputs.push_back(cn); } auto args = PrepareArgumentsReplacement(flatmap.Input(), inputs, ctx, replaceMap); @@ -401,19 +424,6 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f .Done(); } -TExprNode::TListType FindLambdaConnections(const TCoLambda& lambda) { - auto filter = [](const TExprNode::TPtr& node) { - return !TMaybeNode<TDqPhyPrecompute>(node).IsValid(); - }; - - auto predicate = [](const TExprNode::TPtr& node) { - return TMaybeNode<TDqSource>(node).IsValid() || - TMaybeNode<TDqConnection>(node).IsValid(); - }; - - return FindNodes(lambda.Body().Ptr(), filter, predicate); -} - } // namespace TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& outputIndex, TCoLambda& lambda, @@ -566,7 +576,7 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { return node; } - auto innerConnections = FindLambdaConnections(flatmap.Lambda()); + auto innerConnections = FindDqConnections(flatmap.Lambda()); if (innerConnections.empty()) { return node; } @@ -606,7 +616,7 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo return node; } - auto innerConnections = FindLambdaConnections(flatmap.Lambda()); + auto innerConnections = FindDqConnections(flatmap.Lambda()); TMaybeNode<TDqStage> flatmapStage; if (!innerConnections.empty()) { |