aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-11-17 13:53:12 +0300
committerulya-sidorina <yulia@ydb.tech>2022-11-17 13:53:12 +0300
commit8bc98e5d518b5b5cda9f91f28505a3fe746cb45f (patch)
tree4df63d295323f190f56e2ea4fd526946dfe8b969
parent97370e00856dc4e1e46ae93ce827cac6dc235bd7 (diff)
downloadydb-8bc98e5d518b5b5cda9f91f28505a3fe746cb45f.tar.gz
push FlatMap inner precomputes before BuildFlatMapStage
fix(kqp_opt): push precomputs to stage inputs
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp20
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp48
2 files changed, 49 insertions, 19 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 7af04af709..ba0b6b28fa 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3405,6 +3405,26 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(FlatMapLambdaInnerPrecompute) {
+ auto settings = TKikimrSettings()
+ .SetEnablePredicateExtractForDataQueries(false);
+ TKikimrRunner kikimr{settings};
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ $rows = SELECT * FROM KeyValue;
+ $cnt = SELECT count(*) FROM $rows;
+ $join = SELECT l.Value AS value FROM $rows as l LEFT JOIN EightShard AS r on l.Key = r.Key;
+
+ $check = SELECT count(*) FROM $join;
+ SELECT * FROM EightShard WHERE $check = $cnt;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 884744bd11..f650f9b34f 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -302,9 +302,32 @@ TExprBase DqPushMembersFilterToStage(TExprBase node, TExprContext& ctx, IOptimiz
return result.Cast();
}
+TExprNode::TListType FindLambdaPrecomputes(const TCoLambda& lambda) {
+ auto predicate = [](const TExprNode::TPtr& node) {
+ return TMaybeNode<TDqPrecompute>(node).IsValid();
+ };
+
+ TExprNode::TListType result;
+ VisitExpr(lambda.Body().Ptr(), [&result, &predicate] (const TExprNode::TPtr& node) {
+ if (predicate(node)) {
+ result.emplace_back(node);
+ return false;
+ }
+
+ return true;
+ });
+
+ return result;
+}
+
TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& flatmap,
- TExprNode::TListType&& innerConnections, const TParentsMap& parentsMap, TExprContext& ctx)
+ TVector<NNodes::TDqConnection>&& innerConnections, const TParentsMap& parentsMap, TExprContext& ctx)
{
+ auto innerPrecomputes = FindLambdaPrecomputes(flatmap.Lambda());
+ if (!innerPrecomputes.empty()) {
+ return {};
+ }
+
TVector<TDqConnection> inputs;
TNodeOnNodeOwnedMap replaceMap;
@@ -312,15 +335,15 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f
inputs.reserve(innerConnections.size() + 1);
inputs.push_back(flatmap.Input().Cast<TDqConnection>());
for (auto& cn : innerConnections) {
- if (!TMaybeNode<TDqCnUnionAll>(cn).IsValid() && !TMaybeNode<TDqCnMerge>(cn).IsValid()) {
+ if (!cn.Maybe<TDqCnUnionAll>() && !cn.Maybe<TDqCnMerge>()) {
return {};
}
- if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, true)) {
+ if (!IsSingleConsumerConnection(cn, parentsMap, true)) {
return {};
}
- inputs.push_back(TDqConnection(cn));
+ inputs.push_back(cn);
}
auto args = PrepareArgumentsReplacement(flatmap.Input(), inputs, ctx, replaceMap);
@@ -401,19 +424,6 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f
.Done();
}
-TExprNode::TListType FindLambdaConnections(const TCoLambda& lambda) {
- auto filter = [](const TExprNode::TPtr& node) {
- return !TMaybeNode<TDqPhyPrecompute>(node).IsValid();
- };
-
- auto predicate = [](const TExprNode::TPtr& node) {
- return TMaybeNode<TDqSource>(node).IsValid() ||
- TMaybeNode<TDqConnection>(node).IsValid();
- };
-
- return FindNodes(lambda.Body().Ptr(), filter, predicate);
-}
-
} // namespace
TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& outputIndex, TCoLambda& lambda,
@@ -566,7 +576,7 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
return node;
}
- auto innerConnections = FindLambdaConnections(flatmap.Lambda());
+ auto innerConnections = FindDqConnections(flatmap.Lambda());
if (innerConnections.empty()) {
return node;
}
@@ -606,7 +616,7 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return node;
}
- auto innerConnections = FindLambdaConnections(flatmap.Lambda());
+ auto innerConnections = FindDqConnections(flatmap.Lambda());
TMaybeNode<TDqStage> flatmapStage;
if (!innerConnections.empty()) {