diff options
author | spuchin <[email protected]> | 2023-02-04 14:33:03 +0300 |
---|---|---|
committer | spuchin <[email protected]> | 2023-02-04 14:33:03 +0300 |
commit | 478f068cf2e44bce08b28c1c1f2e5f8eb67ad2e8 (patch) | |
tree | 3fe109e63801e3b4ffc9625b20531496292aef08 | |
parent | 44f483e5f9bd958243bbb4ee3d044c6e7bba3c87 (diff) |
Fix opt rules for aggs with dict lookups. ()
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_agg_ut.cpp | 94 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt.cpp | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 34 |
6 files changed, 131 insertions, 6 deletions
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt b/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt index 61c4315a0b8..7dd19cdc4c0 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt @@ -33,6 +33,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE CoreFoundation ) target_sources(ydb-core-kqp-ut-opt PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt index 8545804489f..59fe902ab4a 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt @@ -36,6 +36,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE -ldl ) target_sources(ydb-core-kqp-ut-opt PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux.txt index f9088ead2dc..bed67cb1983 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.linux.txt @@ -38,6 +38,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE -ldl ) target_sources(ydb-core-kqp-ut-opt PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp diff --git a/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp new file mode 100644 index 00000000000..2000c73ad02 --- /dev/null +++ b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp @@ -0,0 +1,94 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> + +namespace NKikimr::NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +Y_UNIT_TEST_SUITE(KqpAgg) { + Y_UNIT_TEST(AggWithLookup) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + $dict = + SELECT ToDict(AGGREGATE_LIST(AsTuple(Value2, AsStruct(Key as Lookup)))) + FROM TwoShard + WHERE Key < 10; + + SELECT + Text, + SUM(DictLookup($dict, Data).Lookup) AS SumLookup + FROM EightShard + GROUP BY Text + ORDER BY SumLookup, Text; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["Value3"];[6u]]; + [["Value1"];[9u]]; + [["Value2"];[9u]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(AggWithSelfLookup) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + $dict = + SELECT ToDict(AGGREGATE_LIST(AsTuple(Key - 100, AsStruct(Data as Lookup)))) + FROM EightShard; + + SELECT + Text, + SUM(DictLookup($dict, Data).Lookup) AS SumLookup + FROM EightShard + GROUP BY Text + ORDER BY SumLookup, Text; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["Value2"];[15]]; + [["Value1"];[16]]; + [["Value3"];[17]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(AggWithSelfLookup2) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + $dict = + SELECT ToDict(AGGREGATE_LIST(AsTuple(Key - 100, AsStruct(Data as Lookup)))) + FROM EightShard; + + SELECT Text, SUM(Lookup) AS SumLookup + FROM ( + SELECT Text, DictLookup($dict, MIN(Data)).Lookup AS Lookup + FROM EightShard + GROUP BY Text + ) + GROUP BY Text + ORDER BY SumLookup, Text; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [["Value1"];[1]]; + [["Value2"];[1]]; + [["Value3"];[1]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index 3ae4e9f2858..8cffde696ae 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -215,7 +215,11 @@ bool IsDqSelfContainedExpr(const TExprBase& node) { return true; }, - [&knownArguments] (const TExprNode::TPtr& node) { + [&selfContained, &knownArguments] (const TExprNode::TPtr& node) { + if (!selfContained) { + return false; + } + if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) { for (const auto& arg : maybeLambda.Cast().Args()) { auto it = knownArguments.find(arg.Raw()); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 2ec20b104e6..a3bdbedf15b 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -698,6 +698,10 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { return node; } + if (!IsDqSelfContainedExpr(flatmap.Input())) { + return node; + } + auto innerConnections = FindDqConnections(flatmap.Lambda()); if (innerConnections.empty()) { return node; @@ -960,11 +964,11 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC return node; } - if (!CanPushDqExpr(combine.PreMapLambda(), dqUnion) || - !CanPushDqExpr(combine.KeySelectorLambda(), dqUnion) || - !CanPushDqExpr(combine.InitHandlerLambda(), dqUnion) || - !CanPushDqExpr(combine.UpdateHandlerLambda(), dqUnion) || - !CanPushDqExpr(combine.FinishHandlerLambda(), dqUnion)) + if (!IsDqPureExpr(combine.PreMapLambda()) || + !IsDqPureExpr(combine.KeySelectorLambda()) || + !IsDqPureExpr(combine.InitHandlerLambda()) || + !IsDqPureExpr(combine.UpdateHandlerLambda()) || + !IsDqPureExpr(combine.FinishHandlerLambda())) { return node; } @@ -1000,6 +1004,26 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC .Done(); } + if (IsDqDependsOnStage(combine.PreMapLambda(), dqUnion.Output().Stage()) || + IsDqDependsOnStage(combine.KeySelectorLambda(), dqUnion.Output().Stage()) || + IsDqDependsOnStage(combine.InitHandlerLambda(), dqUnion.Output().Stage()) || + IsDqDependsOnStage(combine.UpdateHandlerLambda(), dqUnion.Output().Stage()) || + IsDqDependsOnStage(combine.FinishHandlerLambda(), dqUnion.Output().Stage())) + { + return Build<TDqCnUnionAll>(ctx, combine.Pos()) + .Output() + .Stage<TDqStage>() + .Inputs() + .Add(dqUnion) + .Build() + .Program(lambda) + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Build() + .Index().Build("0") + .Build() + .Done(); + } + auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); if (!result) { return node; |