diff options
author | spuchin <spuchin@ydb.tech> | 2022-09-14 15:51:13 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-09-14 15:51:13 +0300 |
commit | 3ccd7883d02ccd67ba50bcb06e0f6d055ce91784 (patch) | |
tree | 5e5e28022b9f97d5fb6b5f1a2431e5758832c4ce | |
parent | 294d9a9529ab04699804027a0f98097506094ba4 (diff) | |
download | ydb-3ccd7883d02ccd67ba50bcb06e0f6d055ce91784.tar.gz |
Phy rule for top-level SqlIn. ()
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 51 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.cpp | 40 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 137 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 7 |
7 files changed, 249 insertions, 1 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 9444349f13..608f15c18c 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -65,6 +65,7 @@ public: AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput)); AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage)); AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); + AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>)); AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>)); @@ -84,6 +85,7 @@ public: AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); + AddHandler(1, &TCoSqlIn::Match, HNDL(BuildSqlIn<true>)); AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>)); AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>)); @@ -359,6 +361,15 @@ protected: } template <bool IsGlobal> + TMaybeNode<TExprBase> BuildSqlIn(TExprBase node, TExprContext& ctx, + IOptimizationContext& optCtx, const TGetParents& getParents) + { + TExprBase output = DqBuildSqlIn(node, ctx, optCtx, *getParents(), IsGlobal); + DumpAppliedRule("BuildSqlIn", node.Ptr(), output.Ptr(), ctx); + return output; + } + + template <bool IsGlobal> TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index d30d3b2656..7764f5cdbe 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3343,7 +3343,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { SELECT * FROM `/Root/EightShard` WHERE Key IN $subquery OR Key == 101 OR Key IN $subquery2; - )", TTxControl::BeginTx()).ExtractValueSync(); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([[[1];[101u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); } @@ -3384,6 +3384,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([["Data1";"Name1";"Value22"]])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(SqlInAsScalar) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$value1").Int32(3).Build() + .AddParam("$value2").Uint64(2).Build() + .AddParam("$value3").Int32(5).Build() + .AddParam("$value4").OptionalInt32(3).Build() + .AddParam("$value5").OptionalInt32({}).Build() + .AddParam("$value6").OptionalInt64(1).Build() + .AddParam("$value7").OptionalInt64(7).Build() + .Build(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + DECLARE $value1 AS Int32; + DECLARE $value2 AS Uint64; + DECLARE $value3 AS Int32; + DECLARE $value4 AS Int32?; + DECLARE $value5 AS Int32?; + DECLARE $value6 AS Int64?; + DECLARE $value7 AS Int64?; + + $data = SELECT Data FROM EightShard WHERE Text = "Value1"; + + SELECT + $value1 IN $data, + $value2 IN $data, + $value3 IN $data, + $value4 IN $data, + $value5 IN $data, + $value6 IN $data, + $value7 IN $data; + )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[ + %true; + %true; + %false; + [%true]; + #; + [%true]; + [%false]]])", FormatResultSetYson(result.GetResultSet(0))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index d936a0939c..3ae4e9f285 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -190,6 +190,46 @@ bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) { return !FindNode(node.Ptr(), predicate); } +bool IsDqSelfContainedExpr(const TExprBase& node) { + bool selfContained = true; + TNodeSet knownArguments; + + VisitExpr(node.Ptr(), + [&selfContained, &knownArguments] (const TExprNode::TPtr& node) { + if (!selfContained) { + return false; + } + + if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) { + for (const auto& arg : maybeLambda.Cast().Args()) { + YQL_ENSURE(knownArguments.emplace(arg.Raw()).second); + } + } + + if (node->IsArgument()) { + if (!knownArguments.contains(node.Get())) { + selfContained = false; + return false; + } + } + + return true; + }, + [&knownArguments] (const TExprNode::TPtr& node) { + if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) { + for (const auto& arg : maybeLambda.Cast().Args()) { + auto it = knownArguments.find(arg.Raw()); + YQL_ENSURE(it != knownArguments.end()); + knownArguments.erase(it); + } + } + + return true; + }); + + return selfContained; +} + bool IsDqDependsOnStage(const TExprBase& node, const TDqStageBase& stage) { return !!FindNode(node.Ptr(), [ptr = stage.Raw()](const TExprNode::TPtr& exprNode) { return exprNode.Get() == ptr; diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index 3d9a14e1f8..3887ef720a 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -58,6 +58,7 @@ ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage); TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node); bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true); +bool IsDqSelfContainedExpr(const NNodes::TExprBase& node); bool IsDqDependsOnStage(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage); bool CanPushDqExpr(const NNodes::TExprBase& expr, const NNodes::TDqStageBase& stage); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index ca9bcead83..bab461d51a 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1755,6 +1755,143 @@ TExprBase DqBuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContex return precompute; } +TExprBase DqBuildSqlIn(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage) +{ + if (!node.Maybe<TCoSqlIn>().Collection().Maybe<TDqCnUnionAll>()) { + return node; + } + + auto sqlIn = node.Cast<TCoSqlIn>(); + auto unionAll = sqlIn.Collection().Cast<TDqCnUnionAll>(); + + if (!IsSingleConsumerConnection(unionAll, parentsMap, allowStageMultiUsage)) { + return node; + } + + if (!IsDqPureExpr(sqlIn.Lookup())) { + return node; + } + + if (!IsDqSelfContainedExpr(sqlIn.Lookup())) { + return node; + } + + if (auto connToPushableStage = DqBuildPushableStage(unionAll, ctx)) { + return TExprBase(ctx.ChangeChild(*node.Raw(), TCoSqlIn::idx_Collection, std::move(connToPushableStage))); + } + + auto localProgram = Build<TCoLambda>(ctx, node.Pos()) + .Args({"stream"}) + .Body<TCoMap>() + .Input<TCoCondense>() + .Input("stream") + .State<TCoList>() + .ListType(ExpandType(node.Pos(), *unionAll.Ref().GetTypeAnn(), ctx)) + .Build() + .SwitchHandler() + .Args({"item", "state"}) + .Body(MakeBool<false>(node.Pos(), ctx)) + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body<TCoAppend>() + .List("state") + .Item("item") + .Build() + .Build() + .Build() + .Lambda() + .Args({"list"}) + .Body<TCoSqlIn>() + .Collection("list") + .Lookup(sqlIn.Lookup()) + .Options(sqlIn.Options()) + .Build() + .Build() + .Build() + .Done(); + + auto newUnion = DqPushLambdaToStageUnionAll(unionAll, localProgram, {}, ctx, optCtx); + + if (!newUnion.IsValid()) { + return node; + } + + auto resultsArg = Build<TCoArgument>(ctx, node.Pos()) + .Name("results_stream") + .Done(); + + TExprBase finalProgram = Build<TCoCondense>(ctx, node.Pos()) + .Input(resultsArg) + .State<TCoJust>() + .Input<TCoBool>() + .Literal().Build("false") + .Build() + .Build() + .SwitchHandler() + .Args({"item", "state"}) + .Body(MakeBool<false>(node.Pos(), ctx)) + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body<TCoIf>() + .Predicate<TCoExists>() + .Optional("state") + .Build() + .ThenValue<TCoIf>() + .Predicate<TCoExists>() + .Optional("item") + .Build() + .ThenValue<TCoIf>() + .Predicate<TCoCoalesce>() + .Predicate("item") + .Value(MakeBool<false>(node.Pos(), ctx)) + .Build() + .ThenValue("item") + .ElseValue("state") + .Build() + .ElseValue<TCoNull>().Build() + .Build() + .ElseValue<TCoNull>().Build() + .Build() + .Build() + .Done(); + + if (sqlIn.Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) { + finalProgram = Build<TCoMap>(ctx, node.Pos()) + .Input(finalProgram) + .Lambda() + .Args({"result"}) + // Result can't be NULL, double check here. + .Body<TCoUnwrap>() + .Optional("result") + .Build() + .Build() + .Done(); + } + + auto stage = Build<TDqStage>(ctx, node.Pos()) + .Inputs() + .Add(newUnion.Cast()) + .Build() + .Program() + .Args({resultsArg}) + .Body(finalProgram) + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Done(); + + return Build<TDqPrecompute>(ctx, node.Pos()) + .Input<TDqCnValue>() + .Output<TDqOutput>() + .Stage(stage) + .Index().Build("0") + .Build() + .Build() + .Done(); +} + TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 5cbe5332ad..87b8f5a704 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -97,6 +97,9 @@ NNodes::TExprBase DqBuildPrecompute(NNodes::TExprBase node, TExprContext& ctx); NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, NYql::IOptimizationContext& optCtx); +NNodes::TExprBase DqBuildSqlIn(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, + const TParentsMap& parentsMap, bool allowStageMultiUsage); + NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage); diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index fe17eebfff..ddd45515bb 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -46,6 +46,7 @@ public: AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); if (enablePrecompute) { AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); + AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>)); AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>)); AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute)); @@ -67,6 +68,7 @@ public: AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); if (enablePrecompute) { + AddHandler(1, &TCoSqlIn::Match, HNDL(BuildSqlIn<true>)); AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>)); AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>)); @@ -307,6 +309,11 @@ protected: } template <bool IsGlobal> + TMaybeNode<TExprBase> BuildSqlIn(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + return DqBuildSqlIn(node, ctx, optCtx, *getParents(), IsGlobal); + } + + template <bool IsGlobal> TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal); } |