summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <[email protected]>2023-09-01 13:25:53 +0300
committerqrort <[email protected]>2023-09-01 13:42:52 +0300
commitb7bb944233f923121a9fc3a7cdeaaaad25a9d6df (patch)
tree82a6c393004029b86e525ce541606a4fbddd65a2
parentd9f4e5381822b17ac7448b1f4386e1f74e118883 (diff)
KIKIMR-18656: Generate WithContext node for aggregates
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp29
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp15
2 files changed, 44 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index c2e4131a3fa..7d0a93a8ecd 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -2131,6 +2131,35 @@ Y_UNIT_TEST_SUITE(KqpPg) {
)", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST(PgAggregate) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto tc = kikimr.GetTableClient();
+ auto session = tc.CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteSchemeQuery(R"(
+ --!syntax_pg
+ CREATE TABLE t(id int8 not null, primary key(id));
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO t(id) VALUES(1::int8), (2::int8), (3::int8);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT sum(id) FROM t;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["6"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 232a6642cc9..0b0deae23f0 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -1079,6 +1079,21 @@ NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprCon
.Build()
.Done();
+ if (HasContextFuncs(*lambda.Ptr())) {
+ lambda = Build<TCoLambda>(ctx, aggCombine.Pos())
+ .Args({ TStringBuf("stream") })
+ .Body<TCoWithContext>()
+ .Input<TExprApplier>()
+ .Apply(lambda)
+ .With(0, TStringBuf("stream"))
+ .Build()
+ .Name()
+ .Value("Agg")
+ .Build()
+ .Build()
+ .Done();
+ }
+
auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (!result) {
return node;