summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <[email protected]>2023-02-04 14:33:03 +0300
committerspuchin <[email protected]>2023-02-04 14:33:03 +0300
commit478f068cf2e44bce08b28c1c1f2e5f8eb67ad2e8 (patch)
tree3fe109e63801e3b4ffc9625b20531496292aef08
parent44f483e5f9bd958243bbb4ee3d044c6e7bba3c87 (diff)
Fix opt rules for aggs with dict lookups. ()
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/ut/opt/kqp_agg_ut.cpp94
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp6
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp34
6 files changed, 131 insertions, 6 deletions
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt b/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt
index 61c4315a0b8..7dd19cdc4c0 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.darwin.txt
@@ -33,6 +33,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE
CoreFoundation
)
target_sources(ydb-core-kqp-ut-opt PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
index 8545804489f..59fe902ab4a 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
@@ -36,6 +36,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE
-ldl
)
target_sources(ydb-core-kqp-ut-opt PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux.txt
index f9088ead2dc..bed67cb1983 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.linux.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.linux.txt
@@ -38,6 +38,7 @@ target_link_options(ydb-core-kqp-ut-opt PRIVATE
-ldl
)
target_sources(ydb-core-kqp-ut-opt PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
diff --git a/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
new file mode 100644
index 00000000000..2000c73ad02
--- /dev/null
+++ b/ydb/core/kqp/ut/opt/kqp_agg_ut.cpp
@@ -0,0 +1,94 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+
+namespace NKikimr::NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+Y_UNIT_TEST_SUITE(KqpAgg) {
+ Y_UNIT_TEST(AggWithLookup) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ $dict =
+ SELECT ToDict(AGGREGATE_LIST(AsTuple(Value2, AsStruct(Key as Lookup))))
+ FROM TwoShard
+ WHERE Key < 10;
+
+ SELECT
+ Text,
+ SUM(DictLookup($dict, Data).Lookup) AS SumLookup
+ FROM EightShard
+ GROUP BY Text
+ ORDER BY SumLookup, Text;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [["Value3"];[6u]];
+ [["Value1"];[9u]];
+ [["Value2"];[9u]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(AggWithSelfLookup) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ $dict =
+ SELECT ToDict(AGGREGATE_LIST(AsTuple(Key - 100, AsStruct(Data as Lookup))))
+ FROM EightShard;
+
+ SELECT
+ Text,
+ SUM(DictLookup($dict, Data).Lookup) AS SumLookup
+ FROM EightShard
+ GROUP BY Text
+ ORDER BY SumLookup, Text;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [["Value2"];[15]];
+ [["Value1"];[16]];
+ [["Value3"];[17]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(AggWithSelfLookup2) {
+ TKikimrRunner kikimr;
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ $dict =
+ SELECT ToDict(AGGREGATE_LIST(AsTuple(Key - 100, AsStruct(Data as Lookup))))
+ FROM EightShard;
+
+ SELECT Text, SUM(Lookup) AS SumLookup
+ FROM (
+ SELECT Text, DictLookup($dict, MIN(Data)).Lookup AS Lookup
+ FROM EightShard
+ GROUP BY Text
+ )
+ GROUP BY Text
+ ORDER BY SumLookup, Text;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [["Value1"];[1]];
+ [["Value2"];[1]];
+ [["Value3"];[1]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index 3ae4e9f2858..8cffde696ae 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -215,7 +215,11 @@ bool IsDqSelfContainedExpr(const TExprBase& node) {
return true;
},
- [&knownArguments] (const TExprNode::TPtr& node) {
+ [&selfContained, &knownArguments] (const TExprNode::TPtr& node) {
+ if (!selfContained) {
+ return false;
+ }
+
if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) {
for (const auto& arg : maybeLambda.Cast().Args()) {
auto it = knownArguments.find(arg.Raw());
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 2ec20b104e6..a3bdbedf15b 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -698,6 +698,10 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
return node;
}
+ if (!IsDqSelfContainedExpr(flatmap.Input())) {
+ return node;
+ }
+
auto innerConnections = FindDqConnections(flatmap.Lambda());
if (innerConnections.empty()) {
return node;
@@ -960,11 +964,11 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC
return node;
}
- if (!CanPushDqExpr(combine.PreMapLambda(), dqUnion) ||
- !CanPushDqExpr(combine.KeySelectorLambda(), dqUnion) ||
- !CanPushDqExpr(combine.InitHandlerLambda(), dqUnion) ||
- !CanPushDqExpr(combine.UpdateHandlerLambda(), dqUnion) ||
- !CanPushDqExpr(combine.FinishHandlerLambda(), dqUnion))
+ if (!IsDqPureExpr(combine.PreMapLambda()) ||
+ !IsDqPureExpr(combine.KeySelectorLambda()) ||
+ !IsDqPureExpr(combine.InitHandlerLambda()) ||
+ !IsDqPureExpr(combine.UpdateHandlerLambda()) ||
+ !IsDqPureExpr(combine.FinishHandlerLambda()))
{
return node;
}
@@ -1000,6 +1004,26 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC
.Done();
}
+ if (IsDqDependsOnStage(combine.PreMapLambda(), dqUnion.Output().Stage()) ||
+ IsDqDependsOnStage(combine.KeySelectorLambda(), dqUnion.Output().Stage()) ||
+ IsDqDependsOnStage(combine.InitHandlerLambda(), dqUnion.Output().Stage()) ||
+ IsDqDependsOnStage(combine.UpdateHandlerLambda(), dqUnion.Output().Stage()) ||
+ IsDqDependsOnStage(combine.FinishHandlerLambda(), dqUnion.Output().Stage()))
+ {
+ return Build<TDqCnUnionAll>(ctx, combine.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add(dqUnion)
+ .Build()
+ .Program(lambda)
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Build()
+ .Index().Build("0")
+ .Build()
+ .Done();
+ }
+
auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (!result) {
return node;