diff options
author | spuchin <[email protected]> | 2022-09-12 19:11:55 +0300 |
---|---|---|
committer | spuchin <[email protected]> | 2022-09-12 19:11:55 +0300 |
commit | 6a8a1de1b435c3ca6014dd3b2e9dd37b5dbf929d (patch) | |
tree | 3fb797b827fbc5dcc5cc7d3dad4279861d199fd8 | |
parent | 4200bcde8524e1f131a5e6af0ec7e051c82cf807 (diff) |
Phy plan for pure Flatmap with lambda connection. ()
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 37 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 62 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 5 |
5 files changed, 103 insertions, 10 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 223c276354a..9444349f134 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -38,6 +38,7 @@ public: AddHandler(0, &TDqPhyLength::Match, HNDL(PushOlapLength)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>)); + AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>)); AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); @@ -166,6 +167,12 @@ protected: return output; } + TMaybeNode<TExprBase> BuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { + TExprBase output = DqBuildPureFlatmapStage(node, ctx); + DumpAppliedRule("BuildPureFlatmapStage", node.Ptr(), output.Ptr(), ctx); + return output; + } + template <bool IsGlobal> TMaybeNode<TExprBase> BuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 4157c7af6e5..d30d3b2656a 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3347,6 +3347,43 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([[[1];[101u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(PushPureFlatmapInnerConnectionsToStage) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$rows").BeginList() + .AddListItem() + .BeginStruct() + .AddMember("Name").String("Name1") + .AddMember("Value2").String("Value22") + .AddMember("Data").String("Data1") + .EndStruct() + .EndList() + .Build() + .Build(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + DECLARE $rows AS List<Struct< + Name: String, + Value2: String, + Data: String>>; + + $values = + SELECT (Name, Value2) FROM Join2 + WHERE Key1 = 101; + + SELECT * FROM AS_TABLE($rows) + WHERE (Name, Value2) IN COMPACT $values; + )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["Data1";"Name1";"Value22"]])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index b3faaa51c3d..ca9bcead839 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -297,6 +297,19 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f .Done(); } +TExprNode::TListType FindLambdaConnections(const TCoLambda& lambda) { + auto filter = [](const TExprNode::TPtr& node) { + return !TMaybeNode<TDqPhyPrecompute>(node).IsValid(); + }; + + auto predicate = [](const TExprNode::TPtr& node) { + return TMaybeNode<TDqSource>(node).IsValid() || + TMaybeNode<TDqConnection>(node).IsValid(); + }; + + return FindNodes(lambda.Body().Ptr(), filter, predicate); +} + } // namespace TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& outputIndex, TCoLambda& lambda, @@ -438,6 +451,44 @@ TExprBase DqPushExtractMembersToStage(TExprBase node, TExprContext& ctx, IOptimi return DqPushMembersFilterToStage<TCoExtractMembers>(node, ctx, optCtx, parentsMap, allowStageMultiUsage); } +TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { + if (!node.Maybe<TCoFlatMapBase>()) { + return node; + } + + auto flatmap = node.Cast<TCoFlatMapBase>(); + + if (!IsDqPureExpr(flatmap.Input())) { + return node; + } + + auto innerConnections = FindLambdaConnections(flatmap.Lambda()); + if (innerConnections.empty()) { + return node; + } + + auto inputStage = Build<TDqStage>(ctx, flatmap.Input().Pos()) + .Inputs() + .Build() + .Program() + .Args({}) + .Body<TCoIterator>() + .List(flatmap.Input()) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, flatmap.Input().Pos())) + .Done(); + + auto inputConnection = Build<TDqCnUnionAll>(ctx, flatmap.Pos()) + .Output() + .Stage(inputStage) + .Index().Build("0") + .Build() + .Done(); + + return TExprBase(ctx.ChangeChild(flatmap.Ref(), TCoFlatMapBase::idx_Input, inputConnection.Ptr())); +} + TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { @@ -451,16 +502,7 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo return node; } - auto filter = [](const TExprNode::TPtr& node) { - return !TMaybeNode<TDqPhyPrecompute>(node).IsValid(); - }; - - auto predicate = [](const TExprNode::TPtr& node) { - return TMaybeNode<TDqSource>(node).IsValid() || - TMaybeNode<TDqConnection>(node).IsValid(); - }; - - auto innerConnections = FindNodes(flatmap.Lambda().Body().Ptr(), filter, predicate); + auto innerConnections = FindLambdaConnections(flatmap.Lambda()); TMaybeNode<TDqStage> flatmapStage; if (!innerConnections.empty()) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index e4d0d50a52a..5cbe5332ad1 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -31,6 +31,8 @@ NNodes::TExprBase DqPushOrderedLMapToStage(NNodes::TExprBase node, TExprContext& NNodes::TExprBase DqPushLMapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); +NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext& ctx); + NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index fe17eebfff6..4ed3c51a273 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -28,6 +28,7 @@ public: AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap)); AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>)); AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>)); + AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>)); AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); @@ -227,6 +228,10 @@ protected: return DqPushExtractMembersToStage(node, ctx, optCtx, *getParents(), IsGlobal); } + TMaybeNode<TExprBase> BuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { + return DqBuildPureFlatmapStage(node, ctx); + } + template <bool IsGlobal> TMaybeNode<TExprBase> BuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqBuildFlatmapStage(node, ctx, optCtx, *getParents(), IsGlobal); |