diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-10-06 19:17:04 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-10-06 22:41:08 +0300 |
commit | 623de1648b7853c6e07f28a4b701e50ed7721cd2 (patch) | |
tree | 107591d1d7a4caadf80129f8d366d138d6950fa3 | |
parent | d43fe5d9acd5d51793a66c6e59375851f51050d5 (diff) | |
download | ydb-623de1648b7853c6e07f28a4b701e50ed7721cd2.tar.gz |
KIKIMR-18531: add type ann for returning list
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_expr_nodes.json | 8 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 41 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 60 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.h | 1 |
6 files changed, 151 insertions, 4 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index e63b42280f9..87f92fcca9e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -518,7 +518,7 @@ public: YQL_CVLOG(NLog::ELevel::ERROR, NLog::EComponent::ProviderKikimr) << "Skip RewriteIO for external entity: unknown entity type: " << (int)tableDesc.Metadata->ExternalSource.SourceType; return true; } - + if (mode != "insert_abort") { ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Write mode '" << static_cast<TStringBuf>(mode) << "' is not supported for external entities")); return false; @@ -584,7 +584,35 @@ public: if (resultNode) { return resultNode; } - + + if (!settings.ReturningList.IsValid()) { + settings.ReturningList = Build<TExprList>(ctx, node->Pos()).Done(); + } + + auto returningColumns = Build<TCoAtomList>(ctx, node->Pos()).Done(); + auto returningStar = Build<TCoAtom>(ctx, node->Pos()).Value("false").Done(); + + TVector<TExprBase> columnsToReturn; + for (const auto item : settings.ReturningList.Cast()) { + auto pgResultNode = item.Cast<TCoPgResultItem>(); + const auto value = pgResultNode.ExpandedColumns().Cast<TCoAtom>().Value(); + if (value.empty()) { + returningStar = Build<TCoAtom>(ctx, node->Pos()).Value("true").Done(); + break; + } else { + auto atom = Build<TCoAtom>(ctx, node->Pos()) + .Value(value) + .Done(); + columnsToReturn.emplace_back(std::move(atom)); + } + } + + if (!columnsToReturn.empty()) { + returningColumns = Build<TCoAtomList>(ctx, node->Pos()) + .Add(columnsToReturn) + .Done(); + } + if (mode == "drop" || mode == "drop_if_exists") { return MakeKiDropTable(node, settings, key, ctx); } else if (mode == "update") { @@ -596,6 +624,8 @@ public: .Table().Build(key.GetTablePath()) .Filter(settings.Filter.Cast()) .Update(settings.Update.Cast()) + .ReturningColumns(returningColumns) + .ReturningStar(returningStar) .Done() .Ptr(); } else { @@ -609,6 +639,8 @@ public: .Value("update_on") .Build() .Settings(settings.Other) + .ReturningColumns(returningColumns) + .ReturningStar(returningStar) .Done() .Ptr(); } @@ -632,6 +664,8 @@ public: .Value("delete_on") .Build() .Settings(settings.Other) + .ReturningColumns(returningColumns) + .ReturningStar(returningStar) .Done() .Ptr(); } @@ -643,6 +677,8 @@ public: .Input(node->Child(3)) .Mode(mode) .Settings(settings.Other) + .ReturningColumns(returningColumns) + .ReturningStar(returningStar) .Done() .Ptr(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index cb8b79e0e42..166ee1bc489 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -76,7 +76,9 @@ {"Index": 2, "Name": "Table", "Type": "TCoAtom"}, {"Index": 3, "Name": "Input", "Type": "TExprBase"}, {"Index": 4, "Name": "Mode", "Type": "TCoAtom"}, - {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"} + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 6, "Name": "ReturningColumns", "Type": "TCoAtomList"}, + {"Index": 7, "Name": "ReturningStar", "Type": "TCoAtom"} ] }, { @@ -88,7 +90,9 @@ {"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"}, {"Index": 2, "Name": "Table", "Type": "TCoAtom"}, {"Index": 3, "Name": "Filter", "Type": "TCoLambda"}, - {"Index": 4, "Name": "Update", "Type": "TCoLambda"} + {"Index": 4, "Name": "Update", "Type": "TCoLambda"}, + {"Index": 5, "Name": "ReturningColumns", "Type": "TCoAtomList"}, + {"Index": 6, "Name": "ReturningStar", "Type": "TCoAtom"} ] }, { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 41689fa25b6..113e2757604 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -353,6 +353,7 @@ private: .Add(convertedValues) .Done(); + node.Ptr()->ChildRef(TKiWriteTable::idx_Input) = list.Ptr(); return TStatus::Repeat; } @@ -490,6 +491,26 @@ private: return status; } + bool sysColumnsEnabled = SessionCtx->Config().SystemColumnsEnabled(); + if (TString(node.ReturningStar()) == "true") { + node.Ptr()->ChildRef(TKiWriteTable::idx_ReturningColumns) = + BuildColumnsList(*table, node.Pos(), ctx, sysColumnsEnabled).Ptr(); + } + + auto selectType = GetReadTableRowType( + ctx, SessionCtx->Tables(), TString(node.DataSink().Cluster()), TString(node.Table().Value()), + node.ReturningColumns(), sysColumnsEnabled + ); + if (!selectType) { + return TStatus::Error; + } + + if (!node.ReturningColumns().Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() + << "It is not allowed to use returning")); + return IGraphTransformer::TStatus::Error; + } + node.Ptr()->SetTypeAnn(node.World().Ref().GetTypeAnn()); return TStatus::Ok; } @@ -560,6 +581,26 @@ private: } } + bool sysColumnsEnabled = SessionCtx->Config().SystemColumnsEnabled(); + if (TString(node.ReturningStar()) == "true") { + node.Ptr()->ChildRef(TKiWriteTable::idx_ReturningColumns) = + BuildColumnsList(*table, node.Pos(), ctx, sysColumnsEnabled).Ptr(); + } + + auto selectType = GetReadTableRowType( + ctx, SessionCtx->Tables(), TString(node.DataSink().Cluster()), TString(node.Table().Value()), + node.ReturningColumns(), sysColumnsEnabled + ); + if (!selectType) { + return TStatus::Error; + } + + if (!node.ReturningColumns().Empty()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() + << "It is not allowed to use returning")); + return IGraphTransformer::TStatus::Error; + } + auto updateBody = node.Update().Body().Ptr(); auto status = ConvertTableRowType(updateBody, *table, ctx); if (status != IGraphTransformer::TStatus::Ok) { diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 36a0b2e8ba7..6dfe85181d1 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1327,6 +1327,66 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(ReturningTypeAnn) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + + const auto queryCreate = Q_(R"( + --!syntax_pg + CREATE TABLE ReturningTable ( + key serial PRIMARY KEY, + value int4))"); + + auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync(); + UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString()); + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO ReturningTable (value) VALUES(2) RETURNING key; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT(result.GetIssues().ToString(true) == "{ <main>: Error: Type annotation, code: 1030 subissue: { <main>:1:1: Error: At function: KiWriteTable! subissue: { <main>:1:1: Error: It is not allowed to use returning } } }"); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO ReturningTable (value) VALUES(2) RETURNING key, value; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT(result.GetIssues().ToString(true) == "{ <main>: Error: Type annotation, code: 1030 subissue: { <main>:1:1: Error: At function: KiWriteTable! subissue: { <main>:1:1: Error: It is not allowed to use returning } } }"); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO ReturningTable (value) VALUES(2) RETURNING *; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT(result.GetIssues().ToString(true) == "{ <main>: Error: Type annotation, code: 1030 subissue: { <main>:1:1: Error: At function: KiWriteTable! subissue: { <main>:1:1: Error: It is not allowed to use returning } } }"); + } + + { + const auto query = Q_(R"( + --!syntax_pg + INSERT INTO ReturningTable (value) VALUES(2) RETURNING fake; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT(result.GetIssues().ToString(true) == "{ <main>: Error: Type annotation, code: 1030 subissue: { <main>:1:1: Error: At function: KiWriteTable! subissue: { <main>:1:1: Error: Column not found: fake } } }"); + } + } + Y_UNIT_TEST(DropTablePg) { TKikimrRunner kikimr; auto client = kikimr.GetTableClient(); diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 4ffac97d621..d6ee8320212 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -225,6 +225,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { TMaybeNode<TCoAtom> mode; TMaybeNode<TCoAtom> temporary; TMaybeNode<TExprList> columns; + TMaybeNode<TExprList> returningList; TMaybeNode<TCoAtomList> primaryKey; TMaybeNode<TCoAtomList> notNullColumns; TMaybeNode<TCoAtomList> serialColumns; @@ -346,6 +347,9 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { pgFilter = tuple.Value().Cast<TCallable>(); } else if (name == "temporary") { temporary = Build<TCoAtom>(ctx, node.Pos()).Value("true").Done(); + } else if (name == "returning") { + YQL_ENSURE(tuple.Value().Maybe<TExprList>()); + returningList = tuple.Value().Cast<TExprList>(); } else { other.push_back(tuple); } @@ -384,6 +388,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { ret.Mode = mode; ret.Temporary = temporary; ret.Columns = columns; + ret.ReturningList = returningList; ret.PrimaryKey = primaryKey; ret.NotNullColumns = notNullColumns; ret.SerialColumns = serialColumns; diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 57f2fa1e6a0..8bc6777edba 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -36,6 +36,7 @@ struct TWriteTableSettings { NNodes::TMaybeNode<NNodes::TCoAtom> Mode; NNodes::TMaybeNode<NNodes::TCoAtom> Temporary; NNodes::TMaybeNode<NNodes::TExprList> Columns; + NNodes::TMaybeNode<NNodes::TExprList> ReturningList; NNodes::TMaybeNode<NNodes::TCoAtomList> PrimaryKey; NNodes::TMaybeNode<NNodes::TCoAtomList> NotNullColumns; NNodes::TMaybeNode<NNodes::TCoAtomList> SerialColumns; |