aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <aidarsamer@ydb.tech>2022-08-31 18:50:03 +0300
committeraidarsamer <aidarsamer@ydb.tech>2022-08-31 18:50:03 +0300
commit4dbaa68bba659ec45b6b86b6ef4d5386541fb81c (patch)
tree80f5a1524c754a459058db15dfd38bff31df1bd3
parentd3520f6619e3db5afdd96eba52f2dac24418e075 (diff)
downloadydb-4dbaa68bba659ec45b6b86b6ef4d5386541fb81c.tar.gz
rewrite SUM using AggApply if pragma EmitAggApply is set.
rewrite SUM using AggApply if pargma EmitAggApply is set.
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp45
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp28
-rw-r--r--ydb/library/yql/sql/v1/aggregation.cpp3
3 files changed, 73 insertions, 3 deletions
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index f33e925d30..a8a86168fc 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1284,7 +1284,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
- Y_UNIT_TEST(AggregationPushdown) {
+ Y_UNIT_TEST(AggregationCountPushdown) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
.SetEnableOlapSchemaOperations(true);
@@ -1308,6 +1308,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString query = R"(
--!syntax_v1
PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ PRAGMA EmitAggApply;
SELECT
COUNT(level)
FROM `/Root/olapStore/olapTable`
@@ -1326,7 +1327,47 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
- Y_UNIT_TEST(AggregationGroupByPushdown) {
+ Y_UNIT_TEST(AggregationSumPushdown) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableOlapSchemaOperations(true);
+ TKikimrRunner kikimr(settings);
+
+ // EnableDebugLogging(kikimr);
+ CreateTestOlapTable(kikimr);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 11000, 3001000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 12000, 3002000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 13000, 3003000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 14000, 3004000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 20000, 2000000, 7000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 30000, 1000000, 11000);
+ }
+
+ {
+ TString query = R"(
+ --!syntax_v1
+ PRAGMA Kikimr.KqpPushOlapProcess = "true";
+ PRAGMA EmitAggApply;
+ SELECT
+ SUM(level)
+ FROM `/Root/olapStore/olapTable`
+ )";
+ auto it = tableClient.StreamExecuteScanQuery(query).GetValueSync();
+
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ CompareYson(result, R"([[[46000;]]])");
+
+ // Check plan
+ // CheckPlanForAggregatePushdown(query, tableClient, { "TKqpOlapAgg" });
+ }
+ }
+
+ Y_UNIT_TEST(AggregationCountGroupByPushdown) {
// remove this return when GROUP BY will be implemented on columnshard
return;
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 2e07fa8399..17c32e5eb3 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -4970,6 +4970,34 @@ namespace {
if (name == "count" || name == "count_all") {
input->SetTypeAnn(ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64));
+ } else if (name == "sum") {
+ bool isOptional;
+ const TDataExprType* lambdaType;
+ if(IsDataOrOptionalOfData(lambda->GetTypeAnn(), isOptional, lambdaType)) {
+ auto lambdaTypeSlot = lambdaType->GetSlot();
+ const TTypeAnnotationNode *sumResultType = nullptr;
+ if (IsDataTypeSigned(lambdaTypeSlot)) {
+ sumResultType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
+ } else if (IsDataTypeUnsigned(lambdaTypeSlot)) {
+ sumResultType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Uint64);
+ } else if (IsDataTypeDecimal(lambdaTypeSlot)) {
+ const auto decimalType = lambdaType->Cast<TDataExprParamsType>();
+ sumResultType = ctx.Expr.MakeType<TDataExprParamsType>(EDataSlot::Decimal, "35", decimalType->GetParamTwo());
+ } else if (IsDataTypeFloat(lambdaTypeSlot) || IsDataTypeInterval(lambdaTypeSlot)) {
+ sumResultType = ctx.Expr.MakeType<TDataExprType>(lambdaTypeSlot);
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported column type: " << lambdaTypeSlot));
+ return IGraphTransformer::TStatus::Error;
+ }
+ input->SetTypeAnn(ctx.Expr.MakeType<TOptionalExprType>(sumResultType));
+ } else if (IsNull(*lambda->GetTypeAnn())) {
+ input->SetTypeAnn(ctx.Expr.MakeType<TNullExprType>());
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ TStringBuilder() << "Unsupported type: " << FormatType(lambda->GetTypeAnn()) << ". Expected Data or Optional of Data."));
+ return IGraphTransformer::TStatus::Error;
+ }
} else {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
TStringBuilder() << "Unsupported agg name: " << name));
diff --git a/ydb/library/yql/sql/v1/aggregation.cpp b/ydb/library/yql/sql/v1/aggregation.cpp
index 72079f08d1..1bf9ff4457 100644
--- a/ydb/library/yql/sql/v1/aggregation.cpp
+++ b/ydb/library/yql/sql/v1/aggregation.cpp
@@ -30,7 +30,8 @@ namespace {
}
static const THashSet<TString> AggApplyFuncs = {
- "count_traits_factory"
+ "count_traits_factory",
+ "sum_traits_factory"
};
class TAggregationFactory : public IAggregation {