diff options
author | qrort <qrort@yandex-team.com> | 2023-08-18 14:07:53 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2023-08-18 17:31:38 +0300 |
commit | bef0f2ad88793fb7cd768695edeac498cb4e1dcf (patch) | |
tree | f6b090f269d716509d5a9d219fa410e745edaf43 | |
parent | 566cf4ffcdaee43f3da7104b99d3dd418ec9fb74 (diff) | |
download | ydb-bef0f2ad88793fb7cd768695edeac498cb4e1dcf.tar.gz |
KIKIMR-17183: pg updates in ydb
-rw-r--r-- | ydb/core/engine/mkql_proto.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_datasink.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 142 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_pgselect.cpp | 49 | ||||
-rw-r--r-- | ydb/library/yql/core/expr_nodes/yql_expr_nodes.json | 27 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_columnorder.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_pg.cpp | 40 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql.cpp | 38 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql_ut.cpp | 22 |
12 files changed, 402 insertions, 67 deletions
diff --git a/ydb/core/engine/mkql_proto.cpp b/ydb/core/engine/mkql_proto.cpp index 1db52c6e37..4b04740b06 100644 --- a/ydb/core/engine/mkql_proto.cpp +++ b/ydb/core/engine/mkql_proto.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/public/decimal/yql_decimal.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h> #include <ydb/core/scheme_types/scheme_types_defs.h> @@ -152,10 +153,22 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType, break; } case NScheme::NTypeIds::Pg: - // TODO: support pg types - CHECK_OR_RETURN_ERROR(false, Sprintf("Unsupported pg type at position %" PRIu32, i)); + { + if (v.HasBytes()) { + c = TCell(v.GetBytes().data(), v.GetBytes().size()); + } else if (v.HasText()) { + auto typeDesc = types[i].GetTypeDesc(); + auto convert = NPg::PgNativeBinaryFromNativeText(v.GetText(), NPg::PgTypeIdFromTypeDesc(typeDesc)); + if (convert.Error) { + CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg: %s in tuple at position %" PRIu32, convert.Error->data(), i)); + } else { + c = TCell(convert.Str.data(), convert.Str.size()); + } + } else { + CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg in tuple at position %" PRIu32, i)); + } break; - + } default: CHECK_OR_RETURN_ERROR(false, Sprintf("Unsupported typeId %" PRIu16 " at index %" PRIu32, typeId, i)); break; @@ -257,10 +270,15 @@ bool CellToValue(NScheme::TTypeInfo type, const TCell& c, NKikimrMiniKQL::TValue val.MutableOptional()->SetText(c.Data(), c.Size()); break; - case NScheme::NTypeIds::Pg: - // TODO: support pg types - errStr = "Unknown pg type"; - return false; + case NScheme::NTypeIds::Pg: { + auto convert = NPg::PgNativeTextFromNativeBinary(TString(c.Data(), c.Size()), NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc())); + if (convert.Error) { + errStr = *convert.Error; + return false; + } + val.MutableOptional()->SetText(convert.Str); + break; + } default: errStr = "Unknown type: " + ToString(typeId); diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index a7bb72eb29..4a62dec8ab 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -208,18 +208,20 @@ private: << "INSERT OR IGNORE is not yet supported for Kikimr.")); return TStatus::Error; } else if (mode == "update") { - if (!settings.Filter) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table update.")); - return TStatus::Error; - } - if (!settings.Update) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Update option is required for table update.")); - return TStatus::Error; + if (!settings.PgFilter) { + if (!settings.Filter) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table update.")); + return TStatus::Error; + } + if (!settings.Update) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Update option is required for table update.")); + return TStatus::Error; + } } SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), key.GetTablePath()); return TStatus::Ok; } else if (mode == "delete") { - if (!settings.PgDelete && !settings.Filter) { + if (!settings.Filter && !settings.PgFilter) { ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table delete.")); return TStatus::Error; } @@ -569,18 +571,32 @@ public: if (mode == "drop") { return MakeKiDropTable(node, settings, key, ctx); } else if (mode == "update") { - YQL_ENSURE(settings.Filter); - YQL_ENSURE(settings.Update); - return Build<TKiUpdateTable>(ctx, node->Pos()) - .World(node->Child(0)) - .DataSink(node->Child(1)) - .Table().Build(key.GetTablePath()) - .Filter(settings.Filter.Cast()) - .Update(settings.Update.Cast()) - .Done() - .Ptr(); + if (settings.Filter) { + YQL_ENSURE(settings.Update); + return Build<TKiUpdateTable>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .Table().Build(key.GetTablePath()) + .Filter(settings.Filter.Cast()) + .Update(settings.Update.Cast()) + .Done() + .Ptr(); + } else { + YQL_ENSURE(settings.PgFilter); + return Build<TKiWriteTable>(ctx, node->Pos()) + .World(node->Child(0)) + .DataSink(node->Child(1)) + .Table().Build(key.GetTablePath()) + .Input(settings.PgFilter.Cast()) + .Mode() + .Value("update_on") + .Build() + .Settings(settings.Other) + .Done() + .Ptr(); + } } else if (mode == "delete") { - YQL_ENSURE(settings.Filter || settings.PgDelete); + YQL_ENSURE(settings.Filter || settings.PgFilter); if (settings.Filter) { return Build<TKiDeleteTable>(ctx, node->Pos()) .World(node->Child(0)) @@ -594,7 +610,7 @@ public: .World(node->Child(0)) .DataSink(node->Child(1)) .Table().Build(key.GetTablePath()) - .Input(settings.PgDelete.Cast()) + .Input(settings.PgFilter.Cast()) .Mode() .Value("delete_on") .Build() diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index f93de42194..aa832e7066 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -287,6 +287,48 @@ namespace { } return true; } + + bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescription* table, TExprContext& ctx) { + bool ok = true; + auto updateKeyCheck = [&](const TStringBuf& colName) { + if (table->GetKeyColumnIndex(TString(colName))) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() + << "Cannot update primary key column: " << colName)); + ok = false; + } + }; + auto pgSelect = node.Input().Cast<TCoPgSelect>(); + for (const auto& option : pgSelect.SelectOptions()) { + if (option.Name() == "set_items") { + auto pgSetItems = option.Value().Cast<TExprList>(); + for (const auto& setItem : pgSetItems) { + auto setItemNode = setItem.Cast<TCoPgSetItem>(); + for (const auto& setItemOption : setItemNode.SetItemOptions()) { + if (setItemOption.Name() == "result") { + auto resultList = setItemOption.Value().Cast<TExprList>(); + bool skipStar = true; + for (const auto& pgResultItem : resultList) { + if (skipStar) { + skipStar = false; + continue; + } + auto pgResultNode = pgResultItem.Cast<TCoPgResultItem>(); + if (pgResultNode.ExpandedColumns().Maybe<TExprList>()) { + auto list = pgResultNode.ExpandedColumns().Cast<TExprList>(); + for (const auto& item : list) { + updateKeyCheck(item.Cast<TCoAtom>().Value()); + } + } else { + updateKeyCheck(pgResultNode.ExpandedColumns().Cast<TCoAtom>().Value()); + } + } + } + } + } + } + } + return ok; + } } class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer @@ -421,6 +463,12 @@ private: } } } else if (op == TYdbOperation::UpdateOn) { + if (TCoPgSelect::Match(node.Input().Ptr().Get())) { + auto ok = ValidatePgUpdateKeys(node, table, ctx); + if (!ok) { + return TStatus::Error; + } + } for (const auto& item : rowType->GetItems()) { auto column = table->Metadata->Columns.FindPtr(TString(item->GetName())); YQL_ENSURE(column); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 7741729b7a..791e41637b 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1676,6 +1676,148 @@ Y_UNIT_TEST_SUITE(KqpPg) { CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST(PgUpdate) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE test ( + key int4 PRIMARY KEY, + value int4 + ))"); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO test (key, value) VALUES (120, 120), (121, 121), (122, 122), (123, 123); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE test SET value = 122 WHERE key = 123; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM test; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"( + [["120";"120"];["121";"121"];["122";"122"];["123";"122"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE test SET key = key, value = 121 WHERE key = 123; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key")); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE test SET key = 12 WHERE key = 123; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key")); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE test SET value = key + 10; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM test; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"( + [["120";"130"];["121";"131"];["122";"132"];["123";"133"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(PgUpdateCompoundKey) { + TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE test ( + key1 int4, + key2 int4, + value int4, + primary key (key1, key2) + ))"); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO test (key1, key2, value) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE test SET key1 = 1, key2 = 2, value = 2 WHERE key1 = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key1")); + UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key2")); + } + { + kikimr.GetTestClient().CreateTable("/Root", R"( + Name: "PgTwoShard" + Columns { Name: "key", Type: "pgint4", NotNull: true } + Columns { Name: "value", Type: "pgint4" } + KeyColumnNames: ["key"], + SplitBoundary { KeyPrefix { Tuple { Optional { Text: "100" } } } } + )"); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + auto describeResult = session.DescribeTable( + "/Root/PgTwoShard", + TDescribeTableSettings().WithTableStatistics(true).WithKeyShardBoundary(true) + ).GetValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(describeResult.GetTableDescription().GetPartitionsCount(), 2); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO PgTwoShard (key, value) VALUES (10, 10), (110, 110); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE PgTwoShard SET value = key + 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM PgTwoShard ORDER BY key; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"( + [["10";"11"];["110";"111"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp index 43a3c81ebd..218c5877c5 100644 --- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp @@ -1610,31 +1610,49 @@ TExprNode::TPtr BuildCrossJoinsBetweenGroups(TPositionHandle pos, const TExprNod return ctx.NewCallable(pos, "EquiJoin", std::move(args)); } -TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr& result, bool subLink, TExprContext& ctx) { +TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr& result, bool subLink, bool emitPgStar, TExprContext& ctx) { return ctx.Builder(pos) .Lambda() .Param("row") .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { ui32 index = 0; + THashMap<TString, TExprNode*> overrideColumns; + if (emitPgStar) { + for (const auto& x : result->Tail().Children()) { + if (x->HeadPtr()->IsAtom()) { + overrideColumns.emplace(TString(x->HeadPtr()->Content()), x.Get()); + } + } + } + auto addAtomToList = [] (TExprNodeBuilder& listBuilder, TExprNode* x) -> void { + listBuilder.Add(0, x->HeadPtr()); + listBuilder.Apply(1, x->TailPtr()) + .With(0, "row") + .Seal(); + listBuilder.Seal(); + }; for (const auto& x : result->Tail().Children()) { if (x->HeadPtr()->IsAtom()) { - auto listBuilder = parent.List(index++); - listBuilder.Add(0, x->HeadPtr()); - listBuilder.Apply(1, x->TailPtr()) - .With(0, "row") - .Seal(); - listBuilder.Seal(); + if (!emitPgStar) { + auto listBuilder = parent.List(index++); + addAtomToList(listBuilder, x.Get()); + } } else { auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); for (ui32 i = 0; i < type->GetSize(); ++i) { auto column = type->GetItems()[i]->GetName(); + auto columnName = subLink ? column : NTypeAnnImpl::RemoveAlias(column); auto listBuilder = parent.List(index++); - listBuilder.Atom(0, subLink ? column : NTypeAnnImpl::RemoveAlias(column)); - listBuilder.Callable(1, "Member") - .Arg(0, "row") - .Atom(1, column); - listBuilder.Seal(); + if (overrideColumns.contains(columnName)) { + addAtomToList(listBuilder, overrideColumns[columnName]); + } else { + listBuilder.Atom(0, columnName); + listBuilder.Callable(1, "Member") + .Arg(0, "row") + .Atom(1, column); + listBuilder.Seal(); + } } } } @@ -3005,6 +3023,7 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct auto extraSortColumns = GetSetting(setItem->Tail(), "final_extra_sort_columns"); auto extraSortKeys = GetSetting(setItem->Tail(), "final_extra_sort_keys"); auto targetColumns = GetSetting(setItem->Tail(), "target_columns"); + bool emitPgStar = (GetSetting(setItem->Tail(), "emit_pg_star") != nullptr); bool oneRow = !from; TExprNode::TPtr list; if (values) { @@ -3013,7 +3032,7 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct } else { YQL_ENSURE(result); YQL_ENSURE(!targetColumns, "target columns for projection are not supported yet"); - TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), ctx); + TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), emitPgStar, ctx); TExprNode::TPtr projectionArg = projectionLambda->Head().HeadPtr(); TExprNode::TPtr projectionRoot = projectionLambda->TailPtr(); TVector<TString> inputAliases; @@ -3244,8 +3263,8 @@ TExprNode::TPtr ExpandPgLike(const TExprNode::TPtr& node, TExprContext& ctx, TOp const bool insensitive = node->IsCallable("PgILike"); if (!insensitive) { auto pattern = node->Child(1); - if (pattern->IsCallable("PgConst") && - pattern->Tail().IsCallable("PgType") && + if (pattern->IsCallable("PgConst") && + pattern->Tail().IsCallable("PgType") && pattern->Tail().Head().Content() == "text") { auto str = pattern->Head().Content(); auto hasUnderscore = AnyOf(str, [](char c) { return c == '_'; }); diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index c2f130d0b9..fd2f837316 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -2367,6 +2367,33 @@ "Name": "TCoWideFromBlocks", "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "WideFromBlocks"} + }, + + { + "Name": "TCoPgSelect", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "PgSelect"}, + "Children": [ + {"Index": 0, "Name": "SelectOptions", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TCoPgSetItem", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "PgSetItem"}, + "Children": [ + {"Index": 0, "Name": "SetItemOptions", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TCoPgResultItem", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "PgResultItem"}, + "Children": [ + {"Index": 0, "Name": "ExpandedColumns", "Type": "TExprBase"}, + {"Index": 1, "Name": "Type", "Type": "TCallable"}, + {"Index": 2, "Name": "Lambda", "Type": "TCoLambda"} + ] } ] } diff --git a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp index 9befa13e82..07c29b4009 100644 --- a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp @@ -49,13 +49,16 @@ IGraphTransformer::TStatus OrderForPgSetItem(const TExprNode::TPtr& node, TExprN } } else { auto result = GetSetting(node->Tail(), "result"); + auto emitPgStar = GetSetting(node->Tail(), "emit_pg_star"); if (result) { for (size_t i = 0; i < result->Tail().ChildrenSize(); i++) { auto col = result->Tail().Child(i); if (col->Head().IsAtom()) { auto alias = TString(col->Head().Content()); YQL_ENSURE(!alias.empty()); - columnOrder.push_back(alias); + if (!emitPgStar) { + columnOrder.push_back(alias); + } } else { YQL_ENSURE(col->Head().IsList()); diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp index f2086dfb64..2fe9e5941d 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp @@ -1209,7 +1209,7 @@ void ScanSublinks(TExprNode::TPtr root, TNodeSet& sublinks) { bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>& possibleAliases, bool* hasStar, bool& hasColumnRef, THashSet<TString>& refs, THashMap<TString, THashSet<TString>>* qualifiedRefs, - TExtContext& ctx, bool scanColumnsOnly) { + TExtContext& ctx, bool scanColumnsOnly, bool hasEmitPgStar = false) { bool isError = false; VisitExpr(root, [&](const TExprNode::TPtr& node) { if (node->IsCallable("PgSubLink")) { @@ -1290,7 +1290,7 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>& } } } else if (node->IsCallable("PgColumnRef")) { - if (hasStar && *hasStar) { + if (hasStar && *hasStar && !hasEmitPgStar) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference")); isError = true; return false; @@ -2591,6 +2591,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN bool hasDistinctAll = false; bool hasDistinctOn = false; bool hasFinalExtraSortColumns = false; + bool hasEmitPgStar = false; TExprNode::TPtr groupExprs; TExprNode::TPtr result; TExprNode::TPtr targetColumns; @@ -2619,7 +2620,15 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN } const auto optionName = option->Head().Content(); - if (optionName == "ext_types" || optionName == "final_ext_types") { + if (optionName == "emit_pg_star") { + if (option->ChildrenSize() > 1) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), + "Non-empty emit_pg_star option is not allowed")); + return IGraphTransformer::TStatus::Error; + } + hasEmitPgStar = true; + } + else if (optionName == "ext_types" || optionName == "final_ext_types") { if (pass != 2) { continue; } @@ -2720,6 +2729,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN } } + THashMap<TString, size_t> outputItemIndex; TVector<const TItemExprType*> outputItems; TExprNode::TListType newResult; bool hasNewResult = false; @@ -2807,7 +2817,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN ScanSublinks(lambda.TailPtr(), sublinks); if (!ScanColumns(lambda.TailPtr(), joinInputs, possibleAliases, &hasStar, hasColumnRef, - refs, &qualifiedRefs, ctx, scanColumnsOnly)) { + refs, &qualifiedRefs, ctx, scanColumnsOnly, hasEmitPgStar)) { return IGraphTransformer::TStatus::Error; } @@ -2877,7 +2887,16 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN TStringBuf columnName = targetColumns ? targetColumns->Child(index)->Content() : column->Head().Content(); - outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn())); + auto itemExpr = ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn()); + if (hasEmitPgStar) { + if (!outputItemIndex.contains(columnName)) { + outputItemIndex.emplace(columnName, outputItems.size()); + outputItems.emplace_back(); + } + outputItems[outputItemIndex[columnName]] = itemExpr; + } else { + outputItems.emplace_back(itemExpr); + } } else { // star or qualified star size_t index = 0; @@ -2889,7 +2908,16 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN itemRef->GetItemType() ); } - outputItems.push_back(itemRef); + if (hasEmitPgStar) { + const auto& name = itemRef->GetName(); + if (!outputItemIndex.contains(name)) { + outputItemIndex.emplace(name, outputItems.size()); + outputItems.emplace_back(); + } + outputItems[outputItemIndex[name]] = itemRef; + } else { + outputItems.emplace_back(itemRef); + } } } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 00c8ba8b80..dd3087c062 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -220,7 +220,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { TVector<TCoNameValueTuple> tableSettings; TVector<TCoNameValueTuple> alterActions; TMaybeNode<TCoAtom> tableType; - TMaybeNode<TCallable> pgDelete; + TMaybeNode<TCallable> pgFilter; for (auto child : node) { if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) { auto tuple = maybeTuple.Cast(); @@ -309,9 +309,9 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { } else if (name == "serialColumns") { YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>()); serialColumns = tuple.Value().Cast<TCoAtomList>(); - } else if (name == "pg_delete") { + } else if (name == "pg_delete" || name == "pg_update") { YQL_ENSURE(tuple.Value().Maybe<TCallable>()); - pgDelete = tuple.Value().Cast<TCallable>(); + pgFilter = tuple.Value().Cast<TCallable>(); } else { other.push_back(tuple); } @@ -363,7 +363,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) { ret.TableSettings = tableProfileSettings; ret.AlterActions = alterTableActions; ret.TableType = tableType; - ret.PgDelete = pgDelete; + ret.PgFilter = pgFilter; return ret; } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index f187dd0948..80ae635932 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -50,7 +50,7 @@ struct TWriteTableSettings { NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TableSettings; NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> AlterActions; NNodes::TMaybeNode<NNodes::TCoAtom> TableType; - NNodes::TMaybeNode<NNodes::TCallable> PgDelete; + NNodes::TMaybeNode<NNodes::TCallable> PgFilter; TWriteTableSettings(const NNodes::TCoNameValueTupleList& other) : Other(other) {} diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index aec48e18eb..e1b90fdcbf 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -338,7 +338,13 @@ public: using TTraverseNodeStack = TStack<std::pair<const Node*, bool>>; [[nodiscard]] - TAstNode* ParseSelectStmt(const SelectStmt* value, bool inner, TVector <TAstNode*> targetColumns = {}, bool allowEmptyResSet = false) { + TAstNode* ParseSelectStmt( + const SelectStmt* value, + bool inner, + TVector <TAstNode*> targetColumns = {}, + bool allowEmptyResSet = false, + bool emitPgStar = false + ) { CTE.emplace_back(); Y_DEFER { CTE.pop_back(); @@ -420,7 +426,8 @@ public: } TVector<TAstNode*> setItemNodes; - for (const auto& x : setItems) { + for (size_t id = 0; id < setItems.size(); id++) { + const auto& x = setItems[id]; bool hasDistinctAll = false; TVector<TAstNode*> distinctOnItems; if (x->distinctClause) { @@ -661,6 +668,10 @@ public: TVector<TAstNode*> res; ui32 i = 0; + if (emitPgStar && id + 1 == setItems.size()) { + res.emplace_back(CreatePgStarResultItem()); + i++; + } for (int targetIndex = 0; targetIndex < ListLength(x->targetList); ++targetIndex) { auto node = ListNodeNth(x->targetList, targetIndex); if (NodeTag(node) != T_ResTarget) { @@ -728,7 +739,11 @@ public: val.push_back(QVL(row.data(), row.size())); } + TVector<TAstNode*> setItemOptions; + if (emitPgStar) { + setItemOptions.push_back(QL(QA("emit_pg_star"))); + } if (targetColumns) { setItemOptions.push_back(QL(QA("target_columns"), QVL(targetColumns.data(), targetColumns.size()))); } @@ -909,6 +924,12 @@ public: } [[nodiscard]] + TAstNode* CreatePgStarResultItem() { + TAstNode* starLambda = L(A("lambda"), QL(), L(A("PgStar"))); + return L(A("PgResultItem"), QAX(""), L(A("Void")), starLambda); + } + + [[nodiscard]] TAstNode* CreatePgResultItem(const ResTarget* r, TAstNode* x, ui32& columnIndex) { bool isStar = false; if (NodeTag(r->val) == T_ColumnRef) { @@ -1056,7 +1077,13 @@ public: .whereClause = value->whereClause, .withClause = value->withClause, }; - const auto select = ParseSelectStmt(&selectStmt, /* inner */ true, {}, /* allowEmptyResSet */ true); + const auto select = ParseSelectStmt( + &selectStmt, + /* inner */ true, + /* targetColumns */{}, + /* allowEmptyResSet */ true, + /*emitPgStar=*/true + ); if (!select) { return nullptr; } @@ -1732,12 +1759,9 @@ public: } } - TAstNode* starLambda = L(A("lambda"), QL(), L(A("PgStar"))); - TAstNode* resultItem = L(A("PgResultItem"), QAX(""), L(A("Void")), starLambda); - TVector<TAstNode*> setItemOptions; - setItemOptions.push_back(QL(QA("result"), QVL(resultItem))); + setItemOptions.push_back(QL(QA("result"), QVL(CreatePgStarResultItem()))); setItemOptions.push_back(QL(QA("from"), QVL(fromList.data(), fromList.size()))); setItemOptions.push_back(QL(QA("join_ops"), QVL(QL()))); diff --git a/ydb/library/yql/sql/pg/pg_sql_ut.cpp b/ydb/library/yql/sql/pg/pg_sql_ut.cpp index ee66f25064..45f49cb005 100644 --- a/ydb/library/yql/sql/pg/pg_sql_ut.cpp +++ b/ydb/library/yql/sql/pg/pg_sql_ut.cpp @@ -408,17 +408,27 @@ Y_UNIT_TEST_SUITE(PgSqlParsingOnly) { Y_UNIT_TEST(UpdateStmt) { auto res = PgSqlToYql("UPDATE plato.Input SET kind = 'test' where kind = 'testtest'"); + Cerr << res.Root->ToString(); TString updateStmtProg = R"( ( (let world (Configure! world (DataSource 'config) 'OrderedColumns)) (let read0 (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"input"))) (Void) '())) (let world (Left! read0)) - (let world (block '( - (let update_select (PgSelect '('('set_items '((PgSetItem '('('result '((PgResultItem '"kind" (Void) (lambda '() (PgConst '"test" (PgType 'text)))))) '('from '('((Right! read0) '"input" '()))) '('join_ops '('())) '('where (PgWhere (Void) (lambda '() (PgOp '"=" (PgColumnRef '"kind") (PgConst '"testtest" (PgType 'text)))))))))) '('set_ops '('push))))) - (let sink (DataSink '"yt" '"plato")) - (let key (Key '('table (String '"input")))) - (return (Write! world sink key (Void) '('('pg_update update_select) '('mode 'update)))) - ))) + (let world (block '((let update_select + (PgSelect '( + '('set_items '((PgSetItem + '('('emit_pg_star) + '('result '((PgResultItem '"" (Void) (lambda '() (PgStar))) + (PgResultItem '"kind" (Void) (lambda '() (PgConst '"test" (PgType 'text)))))) + '('from '('((Right! read0) '"input" '()))) + '('join_ops '('())) + '('where (PgWhere (Void) (lambda '() (PgOp '"=" (PgColumnRef '"kind") (PgConst '"testtest" (PgType 'text)))))))))) + '('set_ops '('push))) + ) + ) + (let sink (DataSink '"yt" '"plato")) + (let key (Key '('table (String '"input")))) + (return (Write! world sink key (Void) '('('pg_update update_select) '('mode 'update))))))) (let world (CommitAll! world)) (return world) ) |