aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-08-18 14:07:53 +0300
committerqrort <qrort@yandex-team.com>2023-08-18 17:31:38 +0300
commitbef0f2ad88793fb7cd768695edeac498cb4e1dcf (patch)
treef6b090f269d716509d5a9d219fa410e745edaf43
parent566cf4ffcdaee43f3da7104b99d3dd418ec9fb74 (diff)
downloadydb-bef0f2ad88793fb7cd768695edeac498cb4e1dcf.tar.gz
KIKIMR-17183: pg updates in ydb
-rw-r--r--ydb/core/engine/mkql_proto.cpp32
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp56
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp48
-rw-r--r--ydb/core/kqp/ut/pg/kqp_pg_ut.cpp142
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_pgselect.cpp49
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json27
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_columnorder.cpp5
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp40
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp8
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h2
-rw-r--r--ydb/library/yql/sql/pg/pg_sql.cpp38
-rw-r--r--ydb/library/yql/sql/pg/pg_sql_ut.cpp22
12 files changed, 402 insertions, 67 deletions
diff --git a/ydb/core/engine/mkql_proto.cpp b/ydb/core/engine/mkql_proto.cpp
index 1db52c6e37..4b04740b06 100644
--- a/ydb/core/engine/mkql_proto.cpp
+++ b/ydb/core/engine/mkql_proto.cpp
@@ -6,6 +6,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
#include <ydb/core/scheme_types/scheme_types_defs.h>
@@ -152,10 +153,22 @@ bool CellsFromTuple(const NKikimrMiniKQL::TType* tupleType,
break;
}
case NScheme::NTypeIds::Pg:
- // TODO: support pg types
- CHECK_OR_RETURN_ERROR(false, Sprintf("Unsupported pg type at position %" PRIu32, i));
+ {
+ if (v.HasBytes()) {
+ c = TCell(v.GetBytes().data(), v.GetBytes().size());
+ } else if (v.HasText()) {
+ auto typeDesc = types[i].GetTypeDesc();
+ auto convert = NPg::PgNativeBinaryFromNativeText(v.GetText(), NPg::PgTypeIdFromTypeDesc(typeDesc));
+ if (convert.Error) {
+ CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg: %s in tuple at position %" PRIu32, convert.Error->data(), i));
+ } else {
+ c = TCell(convert.Str.data(), convert.Str.size());
+ }
+ } else {
+ CHECK_OR_RETURN_ERROR(false, Sprintf("Cannot parse value of type Pg in tuple at position %" PRIu32, i));
+ }
break;
-
+ }
default:
CHECK_OR_RETURN_ERROR(false, Sprintf("Unsupported typeId %" PRIu16 " at index %" PRIu32, typeId, i));
break;
@@ -257,10 +270,15 @@ bool CellToValue(NScheme::TTypeInfo type, const TCell& c, NKikimrMiniKQL::TValue
val.MutableOptional()->SetText(c.Data(), c.Size());
break;
- case NScheme::NTypeIds::Pg:
- // TODO: support pg types
- errStr = "Unknown pg type";
- return false;
+ case NScheme::NTypeIds::Pg: {
+ auto convert = NPg::PgNativeTextFromNativeBinary(TString(c.Data(), c.Size()), NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()));
+ if (convert.Error) {
+ errStr = *convert.Error;
+ return false;
+ }
+ val.MutableOptional()->SetText(convert.Str);
+ break;
+ }
default:
errStr = "Unknown type: " + ToString(typeId);
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index a7bb72eb29..4a62dec8ab 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -208,18 +208,20 @@ private:
<< "INSERT OR IGNORE is not yet supported for Kikimr."));
return TStatus::Error;
} else if (mode == "update") {
- if (!settings.Filter) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table update."));
- return TStatus::Error;
- }
- if (!settings.Update) {
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Update option is required for table update."));
- return TStatus::Error;
+ if (!settings.PgFilter) {
+ if (!settings.Filter) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table update."));
+ return TStatus::Error;
+ }
+ if (!settings.Update) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Update option is required for table update."));
+ return TStatus::Error;
+ }
}
SessionCtx->Tables().GetOrAddTable(TString(cluster), SessionCtx->GetDatabase(), key.GetTablePath());
return TStatus::Ok;
} else if (mode == "delete") {
- if (!settings.PgDelete && !settings.Filter) {
+ if (!settings.Filter && !settings.PgFilter) {
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), "Filter option is required for table delete."));
return TStatus::Error;
}
@@ -569,18 +571,32 @@ public:
if (mode == "drop") {
return MakeKiDropTable(node, settings, key, ctx);
} else if (mode == "update") {
- YQL_ENSURE(settings.Filter);
- YQL_ENSURE(settings.Update);
- return Build<TKiUpdateTable>(ctx, node->Pos())
- .World(node->Child(0))
- .DataSink(node->Child(1))
- .Table().Build(key.GetTablePath())
- .Filter(settings.Filter.Cast())
- .Update(settings.Update.Cast())
- .Done()
- .Ptr();
+ if (settings.Filter) {
+ YQL_ENSURE(settings.Update);
+ return Build<TKiUpdateTable>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .Table().Build(key.GetTablePath())
+ .Filter(settings.Filter.Cast())
+ .Update(settings.Update.Cast())
+ .Done()
+ .Ptr();
+ } else {
+ YQL_ENSURE(settings.PgFilter);
+ return Build<TKiWriteTable>(ctx, node->Pos())
+ .World(node->Child(0))
+ .DataSink(node->Child(1))
+ .Table().Build(key.GetTablePath())
+ .Input(settings.PgFilter.Cast())
+ .Mode()
+ .Value("update_on")
+ .Build()
+ .Settings(settings.Other)
+ .Done()
+ .Ptr();
+ }
} else if (mode == "delete") {
- YQL_ENSURE(settings.Filter || settings.PgDelete);
+ YQL_ENSURE(settings.Filter || settings.PgFilter);
if (settings.Filter) {
return Build<TKiDeleteTable>(ctx, node->Pos())
.World(node->Child(0))
@@ -594,7 +610,7 @@ public:
.World(node->Child(0))
.DataSink(node->Child(1))
.Table().Build(key.GetTablePath())
- .Input(settings.PgDelete.Cast())
+ .Input(settings.PgFilter.Cast())
.Mode()
.Value("delete_on")
.Build()
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index f93de42194..aa832e7066 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -287,6 +287,48 @@ namespace {
}
return true;
}
+
+ bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescription* table, TExprContext& ctx) {
+ bool ok = true;
+ auto updateKeyCheck = [&](const TStringBuf& colName) {
+ if (table->GetKeyColumnIndex(TString(colName))) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
+ << "Cannot update primary key column: " << colName));
+ ok = false;
+ }
+ };
+ auto pgSelect = node.Input().Cast<TCoPgSelect>();
+ for (const auto& option : pgSelect.SelectOptions()) {
+ if (option.Name() == "set_items") {
+ auto pgSetItems = option.Value().Cast<TExprList>();
+ for (const auto& setItem : pgSetItems) {
+ auto setItemNode = setItem.Cast<TCoPgSetItem>();
+ for (const auto& setItemOption : setItemNode.SetItemOptions()) {
+ if (setItemOption.Name() == "result") {
+ auto resultList = setItemOption.Value().Cast<TExprList>();
+ bool skipStar = true;
+ for (const auto& pgResultItem : resultList) {
+ if (skipStar) {
+ skipStar = false;
+ continue;
+ }
+ auto pgResultNode = pgResultItem.Cast<TCoPgResultItem>();
+ if (pgResultNode.ExpandedColumns().Maybe<TExprList>()) {
+ auto list = pgResultNode.ExpandedColumns().Cast<TExprList>();
+ for (const auto& item : list) {
+ updateKeyCheck(item.Cast<TCoAtom>().Value());
+ }
+ } else {
+ updateKeyCheck(pgResultNode.ExpandedColumns().Cast<TCoAtom>().Value());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return ok;
+ }
}
class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer
@@ -421,6 +463,12 @@ private:
}
}
} else if (op == TYdbOperation::UpdateOn) {
+ if (TCoPgSelect::Match(node.Input().Ptr().Get())) {
+ auto ok = ValidatePgUpdateKeys(node, table, ctx);
+ if (!ok) {
+ return TStatus::Error;
+ }
+ }
for (const auto& item : rowType->GetItems()) {
auto column = table->Metadata->Columns.FindPtr(TString(item->GetName()));
YQL_ENSURE(column);
diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
index 7741729b7a..791e41637b 100644
--- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
+++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
@@ -1676,6 +1676,148 @@ Y_UNIT_TEST_SUITE(KqpPg) {
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST(PgUpdate) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE TABLE test (
+ key int4 PRIMARY KEY,
+ value int4
+ ))");
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO test (key, value) VALUES (120, 120), (121, 121), (122, 122), (123, 123);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE test SET value = 122 WHERE key = 123;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"(
+ [["120";"120"];["121";"121"];["122";"122"];["123";"122"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE test SET key = key, value = 121 WHERE key = 123;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key"));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE test SET key = 12 WHERE key = 123;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key"));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE test SET value = key + 10;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM test;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"(
+ [["120";"130"];["121";"131"];["122";"132"];["123";"133"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(PgUpdateCompoundKey) {
+ TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
+ auto db = kikimr.GetQueryClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings()
+ .Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ const auto query = Q_(R"(
+ --!syntax_pg
+ CREATE TABLE test (
+ key1 int4,
+ key2 int4,
+ value int4,
+ primary key (key1, key2)
+ ))");
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO test (key1, key2, value) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE test SET key1 = 1, key2 = 2, value = 2 WHERE key1 = 1;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key1"));
+ UNIT_ASSERT(result.GetIssues().ToString().Contains("Cannot update primary key column: key2"));
+ }
+ {
+ kikimr.GetTestClient().CreateTable("/Root", R"(
+ Name: "PgTwoShard"
+ Columns { Name: "key", Type: "pgint4", NotNull: true }
+ Columns { Name: "value", Type: "pgint4" }
+ KeyColumnNames: ["key"],
+ SplitBoundary { KeyPrefix { Tuple { Optional { Text: "100" } } } }
+ )");
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ auto describeResult = session.DescribeTable(
+ "/Root/PgTwoShard",
+ TDescribeTableSettings().WithTableStatistics(true).WithKeyShardBoundary(true)
+ ).GetValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(describeResult.GetTableDescription().GetPartitionsCount(), 2);
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO PgTwoShard (key, value) VALUES (10, 10), (110, 110);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE PgTwoShard SET value = key + 1;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ SELECT * FROM PgTwoShard ORDER BY key;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"(
+ [["10";"11"];["110";"111"]]
+ )", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
index 43a3c81ebd..218c5877c5 100644
--- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp
@@ -1610,31 +1610,49 @@ TExprNode::TPtr BuildCrossJoinsBetweenGroups(TPositionHandle pos, const TExprNod
return ctx.NewCallable(pos, "EquiJoin", std::move(args));
}
-TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr& result, bool subLink, TExprContext& ctx) {
+TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr& result, bool subLink, bool emitPgStar, TExprContext& ctx) {
return ctx.Builder(pos)
.Lambda()
.Param("row")
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 index = 0;
+ THashMap<TString, TExprNode*> overrideColumns;
+ if (emitPgStar) {
+ for (const auto& x : result->Tail().Children()) {
+ if (x->HeadPtr()->IsAtom()) {
+ overrideColumns.emplace(TString(x->HeadPtr()->Content()), x.Get());
+ }
+ }
+ }
+ auto addAtomToList = [] (TExprNodeBuilder& listBuilder, TExprNode* x) -> void {
+ listBuilder.Add(0, x->HeadPtr());
+ listBuilder.Apply(1, x->TailPtr())
+ .With(0, "row")
+ .Seal();
+ listBuilder.Seal();
+ };
for (const auto& x : result->Tail().Children()) {
if (x->HeadPtr()->IsAtom()) {
- auto listBuilder = parent.List(index++);
- listBuilder.Add(0, x->HeadPtr());
- listBuilder.Apply(1, x->TailPtr())
- .With(0, "row")
- .Seal();
- listBuilder.Seal();
+ if (!emitPgStar) {
+ auto listBuilder = parent.List(index++);
+ addAtomToList(listBuilder, x.Get());
+ }
} else {
auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
for (ui32 i = 0; i < type->GetSize(); ++i) {
auto column = type->GetItems()[i]->GetName();
+ auto columnName = subLink ? column : NTypeAnnImpl::RemoveAlias(column);
auto listBuilder = parent.List(index++);
- listBuilder.Atom(0, subLink ? column : NTypeAnnImpl::RemoveAlias(column));
- listBuilder.Callable(1, "Member")
- .Arg(0, "row")
- .Atom(1, column);
- listBuilder.Seal();
+ if (overrideColumns.contains(columnName)) {
+ addAtomToList(listBuilder, overrideColumns[columnName]);
+ } else {
+ listBuilder.Atom(0, columnName);
+ listBuilder.Callable(1, "Member")
+ .Arg(0, "row")
+ .Atom(1, column);
+ listBuilder.Seal();
+ }
}
}
}
@@ -3005,6 +3023,7 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct
auto extraSortColumns = GetSetting(setItem->Tail(), "final_extra_sort_columns");
auto extraSortKeys = GetSetting(setItem->Tail(), "final_extra_sort_keys");
auto targetColumns = GetSetting(setItem->Tail(), "target_columns");
+ bool emitPgStar = (GetSetting(setItem->Tail(), "emit_pg_star") != nullptr);
bool oneRow = !from;
TExprNode::TPtr list;
if (values) {
@@ -3013,7 +3032,7 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct
} else {
YQL_ENSURE(result);
YQL_ENSURE(!targetColumns, "target columns for projection are not supported yet");
- TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), ctx);
+ TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), emitPgStar, ctx);
TExprNode::TPtr projectionArg = projectionLambda->Head().HeadPtr();
TExprNode::TPtr projectionRoot = projectionLambda->TailPtr();
TVector<TString> inputAliases;
@@ -3244,8 +3263,8 @@ TExprNode::TPtr ExpandPgLike(const TExprNode::TPtr& node, TExprContext& ctx, TOp
const bool insensitive = node->IsCallable("PgILike");
if (!insensitive) {
auto pattern = node->Child(1);
- if (pattern->IsCallable("PgConst") &&
- pattern->Tail().IsCallable("PgType") &&
+ if (pattern->IsCallable("PgConst") &&
+ pattern->Tail().IsCallable("PgType") &&
pattern->Tail().Head().Content() == "text") {
auto str = pattern->Head().Content();
auto hasUnderscore = AnyOf(str, [](char c) { return c == '_'; });
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index c2f130d0b9..fd2f837316 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -2367,6 +2367,33 @@
"Name": "TCoWideFromBlocks",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "WideFromBlocks"}
+ },
+
+ {
+ "Name": "TCoPgSelect",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "PgSelect"},
+ "Children": [
+ {"Index": 0, "Name": "SelectOptions", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TCoPgSetItem",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "PgSetItem"},
+ "Children": [
+ {"Index": 0, "Name": "SetItemOptions", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TCoPgResultItem",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "PgResultItem"},
+ "Children": [
+ {"Index": 0, "Name": "ExpandedColumns", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Type", "Type": "TCallable"},
+ {"Index": 2, "Name": "Lambda", "Type": "TCoLambda"}
+ ]
}
]
}
diff --git a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
index 9befa13e82..07c29b4009 100644
--- a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp
@@ -49,13 +49,16 @@ IGraphTransformer::TStatus OrderForPgSetItem(const TExprNode::TPtr& node, TExprN
}
} else {
auto result = GetSetting(node->Tail(), "result");
+ auto emitPgStar = GetSetting(node->Tail(), "emit_pg_star");
if (result) {
for (size_t i = 0; i < result->Tail().ChildrenSize(); i++) {
auto col = result->Tail().Child(i);
if (col->Head().IsAtom()) {
auto alias = TString(col->Head().Content());
YQL_ENSURE(!alias.empty());
- columnOrder.push_back(alias);
+ if (!emitPgStar) {
+ columnOrder.push_back(alias);
+ }
}
else {
YQL_ENSURE(col->Head().IsList());
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index f2086dfb64..2fe9e5941d 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -1209,7 +1209,7 @@ void ScanSublinks(TExprNode::TPtr root, TNodeSet& sublinks) {
bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>& possibleAliases,
bool* hasStar, bool& hasColumnRef, THashSet<TString>& refs, THashMap<TString, THashSet<TString>>* qualifiedRefs,
- TExtContext& ctx, bool scanColumnsOnly) {
+ TExtContext& ctx, bool scanColumnsOnly, bool hasEmitPgStar = false) {
bool isError = false;
VisitExpr(root, [&](const TExprNode::TPtr& node) {
if (node->IsCallable("PgSubLink")) {
@@ -1290,7 +1290,7 @@ bool ScanColumns(TExprNode::TPtr root, TInputs& inputs, const THashSet<TString>&
}
}
} else if (node->IsCallable("PgColumnRef")) {
- if (hasStar && *hasStar) {
+ if (hasStar && *hasStar && !hasEmitPgStar) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(node->Pos()), "Star is incompatible to column reference"));
isError = true;
return false;
@@ -2591,6 +2591,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
bool hasDistinctAll = false;
bool hasDistinctOn = false;
bool hasFinalExtraSortColumns = false;
+ bool hasEmitPgStar = false;
TExprNode::TPtr groupExprs;
TExprNode::TPtr result;
TExprNode::TPtr targetColumns;
@@ -2619,7 +2620,15 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
}
const auto optionName = option->Head().Content();
- if (optionName == "ext_types" || optionName == "final_ext_types") {
+ if (optionName == "emit_pg_star") {
+ if (option->ChildrenSize() > 1) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()),
+ "Non-empty emit_pg_star option is not allowed"));
+ return IGraphTransformer::TStatus::Error;
+ }
+ hasEmitPgStar = true;
+ }
+ else if (optionName == "ext_types" || optionName == "final_ext_types") {
if (pass != 2) {
continue;
}
@@ -2720,6 +2729,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
}
}
+ THashMap<TString, size_t> outputItemIndex;
TVector<const TItemExprType*> outputItems;
TExprNode::TListType newResult;
bool hasNewResult = false;
@@ -2807,7 +2817,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
ScanSublinks(lambda.TailPtr(), sublinks);
if (!ScanColumns(lambda.TailPtr(), joinInputs, possibleAliases, &hasStar, hasColumnRef,
- refs, &qualifiedRefs, ctx, scanColumnsOnly)) {
+ refs, &qualifiedRefs, ctx, scanColumnsOnly, hasEmitPgStar)) {
return IGraphTransformer::TStatus::Error;
}
@@ -2877,7 +2887,16 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
TStringBuf columnName = targetColumns
? targetColumns->Child(index)->Content()
: column->Head().Content();
- outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn()));
+ auto itemExpr = ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn());
+ if (hasEmitPgStar) {
+ if (!outputItemIndex.contains(columnName)) {
+ outputItemIndex.emplace(columnName, outputItems.size());
+ outputItems.emplace_back();
+ }
+ outputItems[outputItemIndex[columnName]] = itemExpr;
+ } else {
+ outputItems.emplace_back(itemExpr);
+ }
} else {
// star or qualified star
size_t index = 0;
@@ -2889,7 +2908,16 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
itemRef->GetItemType()
);
}
- outputItems.push_back(itemRef);
+ if (hasEmitPgStar) {
+ const auto& name = itemRef->GetName();
+ if (!outputItemIndex.contains(name)) {
+ outputItemIndex.emplace(name, outputItems.size());
+ outputItems.emplace_back();
+ }
+ outputItems[outputItemIndex[name]] = itemRef;
+ } else {
+ outputItems.emplace_back(itemRef);
+ }
}
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index 00c8ba8b80..dd3087c062 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -220,7 +220,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) {
TVector<TCoNameValueTuple> tableSettings;
TVector<TCoNameValueTuple> alterActions;
TMaybeNode<TCoAtom> tableType;
- TMaybeNode<TCallable> pgDelete;
+ TMaybeNode<TCallable> pgFilter;
for (auto child : node) {
if (auto maybeTuple = child.Maybe<TCoNameValueTuple>()) {
auto tuple = maybeTuple.Cast();
@@ -309,9 +309,9 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) {
} else if (name == "serialColumns") {
YQL_ENSURE(tuple.Value().Maybe<TCoAtomList>());
serialColumns = tuple.Value().Cast<TCoAtomList>();
- } else if (name == "pg_delete") {
+ } else if (name == "pg_delete" || name == "pg_update") {
YQL_ENSURE(tuple.Value().Maybe<TCallable>());
- pgDelete = tuple.Value().Cast<TCallable>();
+ pgFilter = tuple.Value().Cast<TCallable>();
} else {
other.push_back(tuple);
}
@@ -363,7 +363,7 @@ TWriteTableSettings ParseWriteTableSettings(TExprList node, TExprContext& ctx) {
ret.TableSettings = tableProfileSettings;
ret.AlterActions = alterTableActions;
ret.TableType = tableType;
- ret.PgDelete = pgDelete;
+ ret.PgFilter = pgFilter;
return ret;
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index f187dd0948..80ae635932 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -50,7 +50,7 @@ struct TWriteTableSettings {
NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> TableSettings;
NNodes::TMaybeNode<NNodes::TCoNameValueTupleList> AlterActions;
NNodes::TMaybeNode<NNodes::TCoAtom> TableType;
- NNodes::TMaybeNode<NNodes::TCallable> PgDelete;
+ NNodes::TMaybeNode<NNodes::TCallable> PgFilter;
TWriteTableSettings(const NNodes::TCoNameValueTupleList& other)
: Other(other) {}
diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp
index aec48e18eb..e1b90fdcbf 100644
--- a/ydb/library/yql/sql/pg/pg_sql.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql.cpp
@@ -338,7 +338,13 @@ public:
using TTraverseNodeStack = TStack<std::pair<const Node*, bool>>;
[[nodiscard]]
- TAstNode* ParseSelectStmt(const SelectStmt* value, bool inner, TVector <TAstNode*> targetColumns = {}, bool allowEmptyResSet = false) {
+ TAstNode* ParseSelectStmt(
+ const SelectStmt* value,
+ bool inner,
+ TVector <TAstNode*> targetColumns = {},
+ bool allowEmptyResSet = false,
+ bool emitPgStar = false
+ ) {
CTE.emplace_back();
Y_DEFER {
CTE.pop_back();
@@ -420,7 +426,8 @@ public:
}
TVector<TAstNode*> setItemNodes;
- for (const auto& x : setItems) {
+ for (size_t id = 0; id < setItems.size(); id++) {
+ const auto& x = setItems[id];
bool hasDistinctAll = false;
TVector<TAstNode*> distinctOnItems;
if (x->distinctClause) {
@@ -661,6 +668,10 @@ public:
TVector<TAstNode*> res;
ui32 i = 0;
+ if (emitPgStar && id + 1 == setItems.size()) {
+ res.emplace_back(CreatePgStarResultItem());
+ i++;
+ }
for (int targetIndex = 0; targetIndex < ListLength(x->targetList); ++targetIndex) {
auto node = ListNodeNth(x->targetList, targetIndex);
if (NodeTag(node) != T_ResTarget) {
@@ -728,7 +739,11 @@ public:
val.push_back(QVL(row.data(), row.size()));
}
+
TVector<TAstNode*> setItemOptions;
+ if (emitPgStar) {
+ setItemOptions.push_back(QL(QA("emit_pg_star")));
+ }
if (targetColumns) {
setItemOptions.push_back(QL(QA("target_columns"), QVL(targetColumns.data(), targetColumns.size())));
}
@@ -909,6 +924,12 @@ public:
}
[[nodiscard]]
+ TAstNode* CreatePgStarResultItem() {
+ TAstNode* starLambda = L(A("lambda"), QL(), L(A("PgStar")));
+ return L(A("PgResultItem"), QAX(""), L(A("Void")), starLambda);
+ }
+
+ [[nodiscard]]
TAstNode* CreatePgResultItem(const ResTarget* r, TAstNode* x, ui32& columnIndex) {
bool isStar = false;
if (NodeTag(r->val) == T_ColumnRef) {
@@ -1056,7 +1077,13 @@ public:
.whereClause = value->whereClause,
.withClause = value->withClause,
};
- const auto select = ParseSelectStmt(&selectStmt, /* inner */ true, {}, /* allowEmptyResSet */ true);
+ const auto select = ParseSelectStmt(
+ &selectStmt,
+ /* inner */ true,
+ /* targetColumns */{},
+ /* allowEmptyResSet */ true,
+ /*emitPgStar=*/true
+ );
if (!select) {
return nullptr;
}
@@ -1732,12 +1759,9 @@ public:
}
}
- TAstNode* starLambda = L(A("lambda"), QL(), L(A("PgStar")));
- TAstNode* resultItem = L(A("PgResultItem"), QAX(""), L(A("Void")), starLambda);
-
TVector<TAstNode*> setItemOptions;
- setItemOptions.push_back(QL(QA("result"), QVL(resultItem)));
+ setItemOptions.push_back(QL(QA("result"), QVL(CreatePgStarResultItem())));
setItemOptions.push_back(QL(QA("from"), QVL(fromList.data(), fromList.size())));
setItemOptions.push_back(QL(QA("join_ops"), QVL(QL())));
diff --git a/ydb/library/yql/sql/pg/pg_sql_ut.cpp b/ydb/library/yql/sql/pg/pg_sql_ut.cpp
index ee66f25064..45f49cb005 100644
--- a/ydb/library/yql/sql/pg/pg_sql_ut.cpp
+++ b/ydb/library/yql/sql/pg/pg_sql_ut.cpp
@@ -408,17 +408,27 @@ Y_UNIT_TEST_SUITE(PgSqlParsingOnly) {
Y_UNIT_TEST(UpdateStmt) {
auto res = PgSqlToYql("UPDATE plato.Input SET kind = 'test' where kind = 'testtest'");
+ Cerr << res.Root->ToString();
TString updateStmtProg = R"(
(
(let world (Configure! world (DataSource 'config) 'OrderedColumns))
(let read0 (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"input"))) (Void) '()))
(let world (Left! read0))
- (let world (block '(
- (let update_select (PgSelect '('('set_items '((PgSetItem '('('result '((PgResultItem '"kind" (Void) (lambda '() (PgConst '"test" (PgType 'text)))))) '('from '('((Right! read0) '"input" '()))) '('join_ops '('())) '('where (PgWhere (Void) (lambda '() (PgOp '"=" (PgColumnRef '"kind") (PgConst '"testtest" (PgType 'text)))))))))) '('set_ops '('push)))))
- (let sink (DataSink '"yt" '"plato"))
- (let key (Key '('table (String '"input"))))
- (return (Write! world sink key (Void) '('('pg_update update_select) '('mode 'update))))
- )))
+ (let world (block '((let update_select
+ (PgSelect '(
+ '('set_items '((PgSetItem
+ '('('emit_pg_star)
+ '('result '((PgResultItem '"" (Void) (lambda '() (PgStar)))
+ (PgResultItem '"kind" (Void) (lambda '() (PgConst '"test" (PgType 'text))))))
+ '('from '('((Right! read0) '"input" '())))
+ '('join_ops '('()))
+ '('where (PgWhere (Void) (lambda '() (PgOp '"=" (PgColumnRef '"kind") (PgConst '"testtest" (PgType 'text))))))))))
+ '('set_ops '('push)))
+ )
+ )
+ (let sink (DataSink '"yt" '"plato"))
+ (let key (Key '('table (String '"input"))))
+ (return (Write! world sink key (Void) '('('pg_update update_select) '('mode 'update)))))))
(let world (CommitAll! world))
(return world)
)