diff options
author | Sergei Puchin <s.puchin@gmail.com> | 2022-04-23 00:32:36 +0300 |
---|---|---|
committer | Sergei Puchin <s.puchin@gmail.com> | 2022-04-23 00:32:36 +0300 |
commit | 19b525690e0c7788c39d741ea94023b64ae31a89 (patch) | |
tree | a98e8f59dfdc60ce93c3945e0d58bd6bdefecad4 | |
parent | fce2d29f7caacacb9e9b44da6761dc70558e3a95 (diff) | |
download | ydb-19b525690e0c7788c39d741ea94023b64ae31a89.tar.gz |
Fix DqBuildScalarPrecompute rule (KIKIMR-14769)
ref:0b7407a0d60e47b8b4aa8a6cce99913a645b96a3
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 25 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 12 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 7 |
6 files changed, 44 insertions, 15 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 5b20590499..db89edff15 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -60,7 +60,7 @@ public: AddHandler(0, &TCoUnorderedBase::Match, HNDL(DropUnordered)); AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage)); AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); - AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute)); + AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>)); AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>)); AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); @@ -78,6 +78,7 @@ public: AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); + AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>)); AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>)); AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>)); @@ -326,8 +327,11 @@ protected: return output; } - TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { - TExprBase output = DqBuildScalarPrecompute(node, ctx, optCtx); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal); DumpAppliedRule("BuildScalarPrecompute", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 93a23f3115..3fec24c11f 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3141,6 +3141,31 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { .ExpectedReads = 1, }); } + + Y_UNIT_TEST(ScalarMultiUsage) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$key").Uint64(701).Build() + .Build(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = 'true'; + + DECLARE $key AS Uint64; + + $row = (SELECT TableRow() FROM EightShard WHERE Key = $key); + + SELECT $row.Text AS Text; + DELETE FROM EightShard WHERE Key = $key AND $row.Data > 0; + )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 71cf0dcff8..a44f8bcfa2 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1557,7 +1557,9 @@ TExprBase DqBuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContex return precompute; } -TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { +TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ if (!node.Maybe<TCoToOptional>()) { return node; } @@ -1569,6 +1571,9 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati } auto unionAll = toOptional.List().Cast<TDqCnUnionAll>(); + if (!IsSingleConsumerConnection(unionAll, parentsMap, allowStageMultiUsage)) { + return node; + } if (!unionAll.Output().Maybe<TDqOutput>()) { return node; @@ -1619,10 +1624,7 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati auto newProgram = Build<TCoLambda>(ctx, node.Pos()) .Args({lambdaArg}) - // DqOutput expects stream as input, thus form stream with one element - .Body<TCoToStream>() - .Input(valueExtractor) - .Build() + .Body(valueExtractor) .Done(); auto newUnion = DqPushLambdaToStageUnionAll(unionAll, newProgram, {}, ctx, optCtx); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 693d993fdf..175ee25c30 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -94,6 +94,6 @@ NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExp NYql::IOptimizationContext& optCtx); NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - NYql::IOptimizationContext& optCtx); + NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index db912c530e..04ff29b03f 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -731,9 +731,6 @@ TStatus AnnotateDqReplicate(const TExprNode::TPtr& input, TExprContext& ctx) { if (!EnsurePersistableType(lambda->Pos(), *lambdaItemType, ctx)) { return TStatus::Error; } - if (!EnsureStructType(lambda->Pos(), *lambdaItemType, ctx)) { - return TStatus::Error; - } outputFlowItems.push_back(lambdaItemType); } auto resultItemType = ctx.MakeType<TVariantExprType>(ctx.MakeType<TTupleExprType>(outputFlowItems)); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index c9171de68c..657d7453e4 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -43,7 +43,7 @@ public: AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); #if 0 AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); - AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute)); + AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); #endif AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>)); @@ -292,8 +292,9 @@ protected: return DqBuildHasItems(node, ctx, optCtx); } - TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { - return DqBuildScalarPrecompute(node, ctx, optCtx); + template <bool IsGlobal> + TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal); } }; |