summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <[email protected]>2022-09-12 19:11:55 +0300
committerspuchin <[email protected]>2022-09-12 19:11:55 +0300
commit6a8a1de1b435c3ca6014dd3b2e9dd37b5dbf929d (patch)
tree3fb797b827fbc5dcc5cc7d3dad4279861d199fd8
parent4200bcde8524e1f131a5e6af0ec7e051c82cf807 (diff)
Phy plan for pure Flatmap with lambda connection. ()
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp7
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp37
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp62
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h2
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp5
5 files changed, 103 insertions, 10 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 223c276354a..9444349f134 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -38,6 +38,7 @@ public:
AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
+ AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage));
@@ -166,6 +167,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> BuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
+ TExprBase output = DqBuildPureFlatmapStage(node, ctx);
+ DumpAppliedRule("BuildPureFlatmapStage", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
template <bool IsGlobal>
TMaybeNode<TExprBase> BuildFlatmapStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 4157c7af6e5..d30d3b2656a 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3347,6 +3347,43 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[1];[101u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(PushPureFlatmapInnerConnectionsToStage) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto params = kikimr.GetTableClient().GetParamsBuilder()
+ .AddParam("$rows").BeginList()
+ .AddListItem()
+ .BeginStruct()
+ .AddMember("Name").String("Name1")
+ .AddMember("Value2").String("Value22")
+ .AddMember("Data").String("Data1")
+ .EndStruct()
+ .EndList()
+ .Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ DECLARE $rows AS List<Struct<
+ Name: String,
+ Value2: String,
+ Data: String>>;
+
+ $values =
+ SELECT (Name, Value2) FROM Join2
+ WHERE Key1 = 101;
+
+ SELECT * FROM AS_TABLE($rows)
+ WHERE (Name, Value2) IN COMPACT $values;
+ )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["Data1";"Name1";"Value22"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index b3faaa51c3d..ca9bcead839 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -297,6 +297,19 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f
.Done();
}
+TExprNode::TListType FindLambdaConnections(const TCoLambda& lambda) {
+ auto filter = [](const TExprNode::TPtr& node) {
+ return !TMaybeNode<TDqPhyPrecompute>(node).IsValid();
+ };
+
+ auto predicate = [](const TExprNode::TPtr& node) {
+ return TMaybeNode<TDqSource>(node).IsValid() ||
+ TMaybeNode<TDqConnection>(node).IsValid();
+ };
+
+ return FindNodes(lambda.Body().Ptr(), filter, predicate);
+}
+
} // namespace
TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& outputIndex, TCoLambda& lambda,
@@ -438,6 +451,44 @@ TExprBase DqPushExtractMembersToStage(TExprBase node, TExprContext& ctx, IOptimi
return DqPushMembersFilterToStage<TCoExtractMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage);
}
+TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
+ if (!node.Maybe<TCoFlatMapBase>()) {
+ return node;
+ }
+
+ auto flatmap = node.Cast<TCoFlatMapBase>();
+
+ if (!IsDqPureExpr(flatmap.Input())) {
+ return node;
+ }
+
+ auto innerConnections = FindLambdaConnections(flatmap.Lambda());
+ if (innerConnections.empty()) {
+ return node;
+ }
+
+ auto inputStage = Build<TDqStage>(ctx, flatmap.Input().Pos())
+ .Inputs()
+ .Build()
+ .Program()
+ .Args({})
+ .Body<TCoIterator>()
+ .List(flatmap.Input())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, flatmap.Input().Pos()))
+ .Done();
+
+ auto inputConnection = Build<TDqCnUnionAll>(ctx, flatmap.Pos())
+ .Output()
+ .Stage(inputStage)
+ .Index().Build("0")
+ .Build()
+ .Done();
+
+ return TExprBase(ctx.ChangeChild(flatmap.Ref(), TCoFlatMapBase::idx_Input, inputConnection.Ptr()));
+}
+
TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
@@ -451,16 +502,7 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return node;
}
- auto filter = [](const TExprNode::TPtr& node) {
- return !TMaybeNode<TDqPhyPrecompute>(node).IsValid();
- };
-
- auto predicate = [](const TExprNode::TPtr& node) {
- return TMaybeNode<TDqSource>(node).IsValid() ||
- TMaybeNode<TDqConnection>(node).IsValid();
- };
-
- auto innerConnections = FindNodes(flatmap.Lambda().Body().Ptr(), filter, predicate);
+ auto innerConnections = FindLambdaConnections(flatmap.Lambda());
TMaybeNode<TDqStage> flatmapStage;
if (!innerConnections.empty()) {
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index e4d0d50a52a..5cbe5332ad1 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -31,6 +31,8 @@ NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext&
NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
+NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext& ctx);
+
NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index fe17eebfff6..4ed3c51a273 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -28,6 +28,7 @@ public:
AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap));
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
+ AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage));
@@ -227,6 +228,10 @@ protected:
return DqPushExtractMembersToStage(node, ctx, optCtx, *getParents(), IsGlobal);
}
+ TMaybeNode<TExprBase> BuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
+ return DqBuildPureFlatmapStage(node, ctx);
+ }
+
template <bool IsGlobal>
TMaybeNode<TExprBase> BuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqBuildFlatmapStage(node, ctx, optCtx, *getParents(), IsGlobal);