aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-09-14 15:51:13 +0300
committerspuchin <spuchin@ydb.tech>2022-09-14 15:51:13 +0300
commit3ccd7883d02ccd67ba50bcb06e0f6d055ce91784 (patch)
tree5e5e28022b9f97d5fb6b5f1a2431e5758832c4ce
parent294d9a9529ab04699804027a0f98097506094ba4 (diff)
downloadydb-3ccd7883d02ccd67ba50bcb06e0f6d055ce91784.tar.gz
Phy rule for top-level SqlIn. ()
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp11
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp51
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp40
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.h1
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp137
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.h3
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp7
7 files changed, 249 insertions, 1 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 9444349f13..608f15c18c 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -65,6 +65,7 @@ public:
AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput));
AddHandler(0, &TDqStage::Match, HNDL(FloatUpStage));
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
+ AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>));
AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
@@ -84,6 +85,7 @@ public:
AddHandler(1, &TDqJoin::Match, HNDL(BuildJoin<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
+ AddHandler(1, &TCoSqlIn::Match, HNDL(BuildSqlIn<true>));
AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>));
AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
AddHandler(1, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<true>));
@@ -359,6 +361,15 @@ protected:
}
template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildSqlIn(TExprBase node, TExprContext& ctx,
+ IOptimizationContext& optCtx, const TGetParents& getParents)
+ {
+ TExprBase output = DqBuildSqlIn(node, ctx, optCtx, *getParents(), IsGlobal);
+ DumpAppliedRule("BuildSqlIn", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
+ template <bool IsGlobal>
TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index d30d3b2656..7764f5cdbe 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3343,7 +3343,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
SELECT * FROM `/Root/EightShard`
WHERE Key IN $subquery OR Key == 101 OR Key IN $subquery2;
- )", TTxControl::BeginTx()).ExtractValueSync();
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[1];[101u];["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
@@ -3384,6 +3384,55 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([["Data1";"Name1";"Value22"]])", FormatResultSetYson(result.GetResultSet(0)));
}
+
+ Y_UNIT_TEST(SqlInAsScalar) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto params = kikimr.GetTableClient().GetParamsBuilder()
+ .AddParam("$value1").Int32(3).Build()
+ .AddParam("$value2").Uint64(2).Build()
+ .AddParam("$value3").Int32(5).Build()
+ .AddParam("$value4").OptionalInt32(3).Build()
+ .AddParam("$value5").OptionalInt32({}).Build()
+ .AddParam("$value6").OptionalInt64(1).Build()
+ .AddParam("$value7").OptionalInt64(7).Build()
+ .Build();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ DECLARE $value1 AS Int32;
+ DECLARE $value2 AS Uint64;
+ DECLARE $value3 AS Int32;
+ DECLARE $value4 AS Int32?;
+ DECLARE $value5 AS Int32?;
+ DECLARE $value6 AS Int64?;
+ DECLARE $value7 AS Int64?;
+
+ $data = SELECT Data FROM EightShard WHERE Text = "Value1";
+
+ SELECT
+ $value1 IN $data,
+ $value2 IN $data,
+ $value3 IN $data,
+ $value4 IN $data,
+ $value5 IN $data,
+ $value6 IN $data,
+ $value7 IN $data;
+ )", TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[
+ %true;
+ %true;
+ %false;
+ [%true];
+ #;
+ [%true];
+ [%false]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index d936a0939c..3ae4e9f285 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -190,6 +190,46 @@ bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) {
return !FindNode(node.Ptr(), predicate);
}
+bool IsDqSelfContainedExpr(const TExprBase& node) {
+ bool selfContained = true;
+ TNodeSet knownArguments;
+
+ VisitExpr(node.Ptr(),
+ [&selfContained, &knownArguments] (const TExprNode::TPtr& node) {
+ if (!selfContained) {
+ return false;
+ }
+
+ if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) {
+ for (const auto& arg : maybeLambda.Cast().Args()) {
+ YQL_ENSURE(knownArguments.emplace(arg.Raw()).second);
+ }
+ }
+
+ if (node->IsArgument()) {
+ if (!knownArguments.contains(node.Get())) {
+ selfContained = false;
+ return false;
+ }
+ }
+
+ return true;
+ },
+ [&knownArguments] (const TExprNode::TPtr& node) {
+ if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) {
+ for (const auto& arg : maybeLambda.Cast().Args()) {
+ auto it = knownArguments.find(arg.Raw());
+ YQL_ENSURE(it != knownArguments.end());
+ knownArguments.erase(it);
+ }
+ }
+
+ return true;
+ });
+
+ return selfContained;
+}
+
bool IsDqDependsOnStage(const TExprBase& node, const TDqStageBase& stage) {
return !!FindNode(node.Ptr(), [ptr = stage.Raw()](const TExprNode::TPtr& exprNode) {
return exprNode.Get() == ptr;
diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h
index 3d9a14e1f8..3887ef720a 100644
--- a/ydb/library/yql/dq/opt/dq_opt.h
+++ b/ydb/library/yql/dq/opt/dq_opt.h
@@ -58,6 +58,7 @@ ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage);
TVector<NNodes::TDqConnection> FindDqConnections(const NNodes::TExprBase& node);
bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true);
+bool IsDqSelfContainedExpr(const NNodes::TExprBase& node);
bool IsDqDependsOnStage(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage);
bool CanPushDqExpr(const NNodes::TExprBase& expr, const NNodes::TDqStageBase& stage);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index ca9bcead83..bab461d51a 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1755,6 +1755,143 @@ TExprBase DqBuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContex
return precompute;
}
+TExprBase DqBuildSqlIn(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage)
+{
+ if (!node.Maybe<TCoSqlIn>().Collection().Maybe<TDqCnUnionAll>()) {
+ return node;
+ }
+
+ auto sqlIn = node.Cast<TCoSqlIn>();
+ auto unionAll = sqlIn.Collection().Cast<TDqCnUnionAll>();
+
+ if (!IsSingleConsumerConnection(unionAll, parentsMap, allowStageMultiUsage)) {
+ return node;
+ }
+
+ if (!IsDqPureExpr(sqlIn.Lookup())) {
+ return node;
+ }
+
+ if (!IsDqSelfContainedExpr(sqlIn.Lookup())) {
+ return node;
+ }
+
+ if (auto connToPushableStage = DqBuildPushableStage(unionAll, ctx)) {
+ return TExprBase(ctx.ChangeChild(*node.Raw(), TCoSqlIn::idx_Collection, std::move(connToPushableStage)));
+ }
+
+ auto localProgram = Build<TCoLambda>(ctx, node.Pos())
+ .Args({"stream"})
+ .Body<TCoMap>()
+ .Input<TCoCondense>()
+ .Input("stream")
+ .State<TCoList>()
+ .ListType(ExpandType(node.Pos(), *unionAll.Ref().GetTypeAnn(), ctx))
+ .Build()
+ .SwitchHandler()
+ .Args({"item", "state"})
+ .Body(MakeBool<false>(node.Pos(), ctx))
+ .Build()
+ .UpdateHandler()
+ .Args({"item", "state"})
+ .Body<TCoAppend>()
+ .List("state")
+ .Item("item")
+ .Build()
+ .Build()
+ .Build()
+ .Lambda()
+ .Args({"list"})
+ .Body<TCoSqlIn>()
+ .Collection("list")
+ .Lookup(sqlIn.Lookup())
+ .Options(sqlIn.Options())
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ auto newUnion = DqPushLambdaToStageUnionAll(unionAll, localProgram, {}, ctx, optCtx);
+
+ if (!newUnion.IsValid()) {
+ return node;
+ }
+
+ auto resultsArg = Build<TCoArgument>(ctx, node.Pos())
+ .Name("results_stream")
+ .Done();
+
+ TExprBase finalProgram = Build<TCoCondense>(ctx, node.Pos())
+ .Input(resultsArg)
+ .State<TCoJust>()
+ .Input<TCoBool>()
+ .Literal().Build("false")
+ .Build()
+ .Build()
+ .SwitchHandler()
+ .Args({"item", "state"})
+ .Body(MakeBool<false>(node.Pos(), ctx))
+ .Build()
+ .UpdateHandler()
+ .Args({"item", "state"})
+ .Body<TCoIf>()
+ .Predicate<TCoExists>()
+ .Optional("state")
+ .Build()
+ .ThenValue<TCoIf>()
+ .Predicate<TCoExists>()
+ .Optional("item")
+ .Build()
+ .ThenValue<TCoIf>()
+ .Predicate<TCoCoalesce>()
+ .Predicate("item")
+ .Value(MakeBool<false>(node.Pos(), ctx))
+ .Build()
+ .ThenValue("item")
+ .ElseValue("state")
+ .Build()
+ .ElseValue<TCoNull>().Build()
+ .Build()
+ .ElseValue<TCoNull>().Build()
+ .Build()
+ .Build()
+ .Done();
+
+ if (sqlIn.Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Optional) {
+ finalProgram = Build<TCoMap>(ctx, node.Pos())
+ .Input(finalProgram)
+ .Lambda()
+ .Args({"result"})
+ // Result can't be NULL, double check here.
+ .Body<TCoUnwrap>()
+ .Optional("result")
+ .Build()
+ .Build()
+ .Done();
+ }
+
+ auto stage = Build<TDqStage>(ctx, node.Pos())
+ .Inputs()
+ .Add(newUnion.Cast())
+ .Build()
+ .Program()
+ .Args({resultsArg})
+ .Body(finalProgram)
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Done();
+
+ return Build<TDqPrecompute>(ctx, node.Pos())
+ .Input<TDqCnValue>()
+ .Output<TDqOutput>()
+ .Stage(stage)
+ .Index().Build("0")
+ .Build()
+ .Build()
+ .Done();
+}
+
TExprBase DqBuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h
index 5cbe5332ad..87b8f5a704 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.h
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.h
@@ -97,6 +97,9 @@ NNodes::TExprBase DqBuildPrecompute(NNodes::TExprBase node, TExprContext& ctx);
NYql::NNodes::TExprBase DqBuildHasItems(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::IOptimizationContext& optCtx);
+NNodes::TExprBase DqBuildSqlIn(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
+ const TParentsMap& parentsMap, bool allowStageMultiUsage);
+
NYql::NNodes::TExprBase DqBuildScalarPrecompute(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
NYql::IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage);
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index fe17eebfff..ddd45515bb 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -46,6 +46,7 @@ public:
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
if (enablePrecompute) {
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
+ AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>));
AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>));
AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute));
@@ -67,6 +68,7 @@ public:
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
if (enablePrecompute) {
+ AddHandler(1, &TCoSqlIn::Match, HNDL(BuildSqlIn<true>));
AddHandler(1, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
AddHandler(1, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>));
AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>));
@@ -307,6 +309,11 @@ protected:
}
template <bool IsGlobal>
+ TMaybeNode<TExprBase> BuildSqlIn(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
+ return DqBuildSqlIn(node, ctx, optCtx, *getParents(), IsGlobal);
+ }
+
+ template <bool IsGlobal>
TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal);
}