aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergei Puchin <s.puchin@gmail.com>2022-04-23 00:32:36 +0300
committerSergei Puchin <s.puchin@gmail.com>2022-04-23 00:32:36 +0300
commit19b525690e0c7788c39d741ea94023b64ae31a89 (patch)
treea98e8f59dfdc60ce93c3945e0d58bd6bdefecad4
parentfce2d29f7caacacb9e9b44da6761dc70558e3a95 (diff)
downloadydb-19b525690e0c7788c39d741ea94023b64ae31a89.tar.gz
Fix DqBuildScalarPrecompute rule (KIKIMR-14769)
ref:0b7407a0d60e47b8b4aa8a6cce99913a645b96a3
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp10
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp25
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp12
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h2
-rw-r--r--ydb/library/yql/dq/type_ann/dq_type_ann.cpp3
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp7
6 files changed, 44 insertions, 15 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 5b20590499..db89edff15 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -60,7 +60,7 @@ public:
AddHandler(0, &TCoUnorderedBase::Match, HNDL(DropUnordered));
AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage));
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
- AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute));
+ AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
@@ -78,6 +78,7 @@ public:
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
+ AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>));
AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>));
AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>));
@@ -326,8 +327,11 @@ protected:
return output;
}
- TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
- TExprBase output = DqBuildScalarPrecompute(node, ctx, optCtx);
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("BuildScalarPrecompute", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 93a23f3115..3fec24c11f 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3141,6 +3141,31 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
.ExpectedReads = 1,
});
}
+
+ Y_UNIT_TEST(ScalarMultiUsage) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto params = kikimr.GetTableClient().GetParamsBuilder()
+ .AddParam("$key").Uint64(701).Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = 'true';
+
+ DECLARE $key AS Uint64;
+
+ $row = (SELECT TableRow() FROM EightShard WHERE Key = $key);
+
+ SELECT $row.Text AS Text;
+ DELETE FROM EightShard WHERE Key = $key AND $row.Data > 0;
+ )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 71cf0dcff8..a44f8bcfa2 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1557,7 +1557,9 @@ TExprBase DqBuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContex
return precompute;
}
-TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
+TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
if (!node.Maybe<TCoToOptional>()) {
return node;
}
@@ -1569,6 +1571,9 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati
}
auto unionAll = toOptional.List().Cast<TDqCnUnionAll>();
+ if (!IsSingleConsumerConnection(unionAll, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
if (!unionAll.Output().Maybe<TDqOutput>()) {
return node;
@@ -1619,10 +1624,7 @@ TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizati
auto newProgram = Build<TCoLambda>(ctx, node.Pos())
.Args({lambdaArg})
- // DqOutput expects stream as input, thus form stream with one element
- .Body<TCoToStream>()
- .Input(valueExtractor)
- .Build()
+ .Body(valueExtractor)
.Done();
auto newUnion = DqPushLambdaToStageUnionAll(unionAll, newProgram, {}, ctx, optCtx);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 693d993fdf..175ee25c30 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -94,6 +94,6 @@ NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExp
NYql::IOptimizationContext& optCtx);
NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
- NYql::IOptimizationContext& optCtx);
+ NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
index db912c530e..04ff29b03f 100644
--- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
+++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
@@ -731,9 +731,6 @@ TStatus AnnotateDqReplicate(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsurePersistableType(lambda->Pos(), *lambdaItemType, ctx)) {
return TStatus::Error;
}
- if (!EnsureStructType(lambda->Pos(), *lambdaItemType, ctx)) {
- return TStatus::Error;
- }
outputFlowItems.push_back(lambdaItemType);
}
auto resultItemType = ctx.MakeType<TVariantExprType>(ctx.MakeType<TTupleExprType>(outputFlowItems));
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index c9171de68c..657d7453e4 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -43,7 +43,7 @@ public:
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
#if 0
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
- AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute));
+ AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
#endif
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
@@ -292,8 +292,9 @@ protected:
return DqBuildHasItems(node, ctx, optCtx);
}
- TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) {
- return DqBuildScalarPrecompute(node, ctx, optCtx);
+ template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal);
}
};