diff options
author | qrort <[email protected]> | 2023-09-01 13:25:53 +0300 |
---|---|---|
committer | qrort <[email protected]> | 2023-09-01 13:42:52 +0300 |
commit | b7bb944233f923121a9fc3a7cdeaaaad25a9d6df (patch) | |
tree | 82a6c393004029b86e525ce541606a4fbddd65a2 | |
parent | d9f4e5381822b17ac7448b1f4386e1f74e118883 (diff) |
KIKIMR-18656: Generate WithContext node for aggregates
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 15 |
2 files changed, 44 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index c2e4131a3fa..7d0a93a8ecd 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2131,6 +2131,35 @@ Y_UNIT_TEST_SUITE(KqpPg) { )", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST(PgAggregate) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto tc = kikimr.GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(R"( + --!syntax_pg + CREATE TABLE t(id int8 not null, primary key(id)); + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO t(id) VALUES(1::int8), (2::int8), (3::int8); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT sum(id) FROM t; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["6"]])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 232a6642cc9..0b0deae23f0 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -1079,6 +1079,21 @@ NNodes::TExprBase DqPushAggregateCombineToStage(NNodes::TExprBase node, TExprCon .Build() .Done(); + if (HasContextFuncs(*lambda.Ptr())) { + lambda = Build<TCoLambda>(ctx, aggCombine.Pos()) + .Args({ TStringBuf("stream") }) + .Body<TCoWithContext>() + .Input<TExprApplier>() + .Apply(lambda) + .With(0, TStringBuf("stream")) + .Build() + .Name() + .Value("Agg") + .Build() + .Build() + .Done(); + } + auto result = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); if (!result) { return node; |