diff options
author | qrort <qrort@ydb.tech> | 2023-12-15 16:47:41 +0300 |
---|---|---|
committer | qrort <qrort@ydb.tech> | 2023-12-15 19:01:38 +0300 |
commit | 43626472bfb03f6055dc11277177e511a7cbb41a (patch) | |
tree | dadfe65ea5e298d017eaf866707e1bb25575dead | |
parent | 2fbf87b4f7c683af7e03073faa58a37746299a33 (diff) | |
download | ydb-43626472bfb03f6055dc11277177e511a7cbb41a.tar.gz |
KIKIMR-19452: Pg insert from selection by column order
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp | 130 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 151 | ||||
-rw-r--r-- | ydb/library/yql/core/common_opt/yql_co_pgselect.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_columnorder.cpp | 59 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_pg.cpp | 45 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_type_annotation.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_type_annotation.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.cpp | 143 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/provider/yql_provider.h | 18 | ||||
-rw-r--r-- | ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp | 35 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg/pg_sql.cpp | 3 |
14 files changed, 395 insertions, 241 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 55a44efa37..adac649485 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -380,11 +380,16 @@ private: } auto op = GetTableOp(node); - if (NPgTypeAnn::NeedsValuesRename(node, op)) { - if (!NPgTypeAnn::RewriteValuesColumnNames(node, table, ctx, Types)) { + if (NPgTypeAnn::IsPgInsert(node, op)) { + TExprNode::TPtr newInput; + auto ok = NCommon::RenamePgSelectColumns(node.Input().Cast<TCoPgSelect>(), newInput, table->Metadata->ColumnOrder, ctx, Types); + if (!ok) { return TStatus::Error; } - return TStatus::Repeat; + if (newInput != node.Input().Ptr()) { + node.Ptr()->ChildRef(TKiWriteTable::idx_Input) = newInput; + return TStatus::Repeat; + } } if (!rowType) { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp index 0102e5a95f..cb9593930a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.cpp @@ -1,5 +1,6 @@ #include "yql_kikimr_type_ann_pg.h" +#include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <util/string/join.h> namespace NYql { @@ -7,128 +8,12 @@ namespace NPgTypeAnn { using namespace NNodes; -namespace { - bool MatchesSetItemOption(const TExprBase& setItemOption, TStringBuf name) { - if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 0) { - if (setItemOption.Ref().ChildPtr(0)->Content() == name) { - return true; - } - } - return false; +bool IsPgInsert(const TKiWriteTable &node, TYdbOperation op) { + if (auto pgSelect = node.Input().Maybe<TCoPgSelect>()) { + return op == TYdbOperation::InsertAbort + && NCommon::NeedToRenamePgSelectColumns(pgSelect.Cast()); } - - TExprNode::TPtr GetSetItemOptionValue(const TExprBase& setItemOption) { - if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 1) { - return setItemOption.Ref().ChildPtr(1); - } - return nullptr; - } - - bool TransformPgSetItemOption( - const TKiWriteTable& node, - TStringBuf optionName, - std::function<void(const TExprBase&)> lambda - ) { - bool applied = false; - if (auto pgSelect = node.Input().Maybe<TCoPgSelect>()) { - for (const auto& option : pgSelect.Cast().SelectOptions()) { - if (option.Name() == "set_items") { - auto pgSetItems = option.Value().Cast<TExprList>(); - for (const auto& setItem : pgSetItems) { - auto setItemNode = setItem.Cast<TCoPgSetItem>(); - for (const auto& setItemOption : setItemNode.SetItemOptions()) { - if (MatchesSetItemOption(setItemOption, optionName)) { - applied = true; - lambda(setItemOption); - } - } - } - } - } - } - return applied; - } - - TExprNode::TPtr GetSetItemOption(const TKiWriteTable& node, TStringBuf optionName) { - TExprNode::TPtr nodePtr = nullptr; - TransformPgSetItemOption(node, optionName, [&nodePtr](const TExprBase& option) { - nodePtr = option.Ptr(); - }); - return nodePtr; - } -} //namespace - -bool NeedsValuesRename(const NNodes::TKiWriteTable &node, TYdbOperation op) { - auto fill = GetSetItemOption(node, "fill_target_columns"); - - return op == TYdbOperation::InsertAbort - && fill - && !GetSetItemOptionValue(TExprBase(fill)); -} - -bool RewriteValuesColumnNames( - const TKiWriteTable& node, - const TKikimrTableDescription* table, - TExprContext& ctx, - TTypeAnnotationContext& types -) { - bool ok = true; - TransformPgSetItemOption(node, "values", [&ok, &node, &table, &ctx, &types](const TExprBase &setItemOption) { - auto values = GetSetItemOptionValue(setItemOption); - if (values->ChildrenSize() > table->Metadata->Columns.size()) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << Sprintf( - "VALUES have %zu columns, INSERT INTO expects: %zu", - values->ChildrenSize(), - table->Metadata->Columns.size() - ))); - ok = false; - return; - } - TExprNode::TListType columns; - THashMap<TString, TString> valueColumnName; - columns.reserve(values->ChildrenSize()); - for (ui32 index = 0; index < values->ChildrenSize(); ++index) { - valueColumnName[values->Child(index)->Content()] = table->Metadata->ColumnOrder.at(index); - columns.push_back(Build<TCoAtom>(ctx, node.Pos()) - .Value(table->Metadata->ColumnOrder.at(index)) - .Done().Ptr()); - columns.back()->SetTypeAnn(values->Child(index)->GetTypeAnn()); - } - values->ChangeChildrenInplace(std::move(columns)); - auto input = node.Ptr()->ChildRef(TKiWriteTable::idx_Input); - const TTypeAnnotationNode* inputType; - switch (input->GetTypeAnn()->GetKind()) { - case ETypeAnnotationKind::List: - inputType = input->GetTypeAnn()->Cast<TListExprType>()->GetItemType(); - break; - default: - inputType = input->GetTypeAnn(); - break; - } - Y_ENSURE(inputType->GetKind() == ETypeAnnotationKind::Struct); - TVector<const TItemExprType*> rowTypeItems; - for (const auto& item : inputType->Cast<TStructExprType>()->GetItems()) { - rowTypeItems.emplace_back(ctx.MakeType<TItemExprType>(valueColumnName[item->GetName()], item->GetItemType())); - } - const TStructExprType* rowStructType = ctx.MakeType<TStructExprType>(rowTypeItems); - if (input->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { - input->SetTypeAnn(ctx.MakeType<TListExprType>(rowStructType)); - } else { - input->SetTypeAnn(rowStructType); - } - types.SetColumnOrder(*input, TVector<TString>(table->Metadata->ColumnOrder.begin(), table->Metadata->ColumnOrder.begin() + values->ChildrenSize()), ctx, /*overwrite=*/true); - }); - if (ok) { - auto fill = GetSetItemOption(node, "fill_target_columns"); - fill->ChangeChildrenInplace({ - fill->Child(0), - Build<TCoAtom>(ctx, node.Pos()) - .Value("done") - .Done().Ptr() - }); - fill->ChildPtr(1)->SetTypeAnn(ctx.MakeType<TUnitExprType>()); - } - return ok; + return false; } bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescription* table, TExprContext& ctx) { @@ -140,7 +25,8 @@ bool ValidatePgUpdateKeys(const TKiWriteTable& node, const TKikimrTableDescripti ok = false; } }; - TransformPgSetItemOption(node, "result", [&updateKeyCheck](const TExprBase& setItemOptionNode) { + auto pgSelect = node.Input().Cast<TCoPgSelect>(); + NCommon::TransformPgSetItemOption(pgSelect, "result", [&updateKeyCheck](const TExprBase& setItemOptionNode) { auto setItemOption = setItemOptionNode.Cast<TCoNameValueTuple>(); auto resultList = setItemOption.Value().Cast<TExprList>(); for (size_t id = 1; id < resultList.Size(); id++) { diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h b/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h index 02715fbe1e..8f04fd4c22 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann_pg.h @@ -6,16 +6,10 @@ namespace NYql { namespace NPgTypeAnn { -bool NeedsValuesRename( +bool IsPgInsert( const NNodes::TKiWriteTable& node, TYdbOperation op); -bool RewriteValuesColumnNames( - const NNodes::TKiWriteTable& node, - const TKikimrTableDescription* table, - TExprContext& ctx, - TTypeAnnotationContext& types); - bool ValidatePgUpdateKeys( const NNodes::TKiWriteTable& node, const TKikimrTableDescription* table, diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp index 0260f8f4a6..3c33045bf0 100644 --- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp @@ -191,8 +191,10 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { NULL::int2 ))"); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT(!result.IsSuccess()); - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR), result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); + UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" + " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); } { /* set NULL to not null pk column */ @@ -202,7 +204,7 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { NULL::int2, 123::int2 ))"); auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); UNIT_ASSERT_NO_DIFF(result.GetIssues().ToString(), "<main>: Error: Execution, code: 1060\n" " <main>: Error: Tried to insert NULL value into NOT NULL column: key, code: 2031\n"); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 99f85527a3..e37b80e71c 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1181,7 +1181,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } - Y_UNIT_TEST(InsertFromSelect) { + Y_UNIT_TEST(InsertFromSelect_Simple) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); auto testSingleType = [&kikimr] (const TPgTypeTestSpec& spec) { @@ -1199,7 +1199,9 @@ Y_UNIT_TEST_SUITE(KqpPg) { --!syntax_pg INSERT INTO )" << emptyTableName << " (key, value) SELECT * FROM \"" << tableName << "\";" , TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::INTERNAL_ERROR); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + result = ExecutePgSelect(kikimr, emptyTableName); + ValidatePgYqlResult(result, spec); }; for (const auto& spec : typeSpecs) { @@ -1208,6 +1210,111 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(InsertFromSelect_NoReorder) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE t ( + columnA int4 PRIMARY KEY, + columnB int4 NOT NULL + ))", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO t SELECT 1 as columnC, 2 as columnA; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO t (columnA, columnB) SELECT 2 as columnB, 4 as columnA; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM t; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + Cerr << "Result:\n" << FormatResultSetYson(result.GetResultSet(0)) << Endl; + CompareYson(R"( + [["1";"2"];["2";"4"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(InsertFromSelect_Serial) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE movies ( + id serial PRIMARY KEY, + title text NOT NULL + ))", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE movies_2 ( + id serial PRIMARY KEY, + title text NOT NULL + ))", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO movies (id, title) VALUES (1, 'movie1'), (4, 'movie4'); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM movies; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"movie1"];["4";"movie4"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO movies_2 (title) + SELECT title + FROM movies; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM movies_2; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"movie1"];["2";"movie4"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(V1CreateTable) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); @@ -2712,7 +2819,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { INSERT INTO t VALUES (1, 'a', 'a'); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); - UNIT_ASSERT(result.GetIssues().ToString().Contains("VALUES have 3 columns, INSERT INTO expects: 2")); + UNIT_ASSERT(result.GetIssues().ToString().Contains("values have 3 columns, INSERT INTO expects: 2")); } { auto result = db.ExecuteQuery(R"( @@ -2779,6 +2886,42 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(Insert_Serial) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE movies ( + id serial PRIMARY KEY, + title text NOT NULL + ))", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO movies VALUES (1, 'movie1'), (4, 'movie4'); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + SELECT * FROM movies; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"movie1"];["4";"movie4"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + } + Y_UNIT_TEST(InsertNoTargetColumns_Serial) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); @@ -3227,7 +3370,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { INSERT INTO PgTable1 VALUES (1, 'a', 'a'); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); - UNIT_ASSERT(result.GetIssues().ToString().Contains("VALUES have 3 columns, INSERT INTO expects: 2")); + UNIT_ASSERT(result.GetIssues().ToString().Contains("values have 3 columns, INSERT INTO expects: 2")); } { auto result = db.ExecuteQuery(R"( diff --git a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp index 9636490b0f..a7138a13bb 100644 --- a/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp +++ b/ydb/library/yql/core/common_opt/yql_co_pgselect.cpp @@ -748,7 +748,6 @@ TExprNode::TPtr ExpandPositionalUnionAll(const TExprNode& node, const TVector<TC TExprNode::TPtr BuildValues( TPositionHandle pos, const TExprNode::TPtr& values, - const TExprNode::TPtr& targetColumns, TExprContext& ctx ) { return ctx.Builder(pos) @@ -759,9 +758,7 @@ TExprNode::TPtr BuildValues( .Callable("AsStruct") .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { for (ui32 index = 0; index < values->Child(1)->ChildrenSize(); ++index) { - TStringBuf alias = targetColumns - ? targetColumns->Child(1)->Child(index)->Content() - : values->Child(1)->Child(index)->Content(); + TStringBuf alias = values->Child(1)->Child(index)->Content(); parent .List(index) .Atom(0, alias) @@ -1716,8 +1713,8 @@ TExprNode::TPtr BuildProjectionLambda(TPositionHandle pos, const TExprNode::TPtr } } else { auto type = x->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); - for (ui32 i = 0; i < type->GetSize(); ++i) { - auto column = type->GetItems()[i]->GetName(); + for (const auto& item : type->GetItems()) { + TStringBuf column = item->GetName(); auto columnName = subLink ? column : NTypeAnnImpl::RemoveAlias(column); auto listBuilder = parent.List(index++); if (overrideColumns.contains(columnName)) { @@ -3148,14 +3145,13 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct auto sort = GetSetting(setItem->Tail(), "sort"); auto extraSortColumns = GetSetting(setItem->Tail(), "final_extra_sort_columns"); auto extraSortKeys = GetSetting(setItem->Tail(), "final_extra_sort_keys"); - auto targetColumns = GetSetting(setItem->Tail(), "target_columns"); bool emitPgStar = (GetSetting(setItem->Tail(), "emit_pg_star") != nullptr); bool unknownsAllowed = (GetSetting(setItem->Tail(), "unknowns_allowed") != nullptr); bool oneRow = !from; TExprNode::TPtr list; if (values) { YQL_ENSURE(!result); - list = BuildValues(node->Pos(), values, targetColumns, ctx); + list = BuildValues(node->Pos(), values, ctx); if (!unknownsAllowed) { auto pos = node->Pos(); @@ -3163,7 +3159,6 @@ TExprNode::TPtr ExpandPgSelectImpl(const TExprNode::TPtr& node, TExprContext& ct } } else { YQL_ENSURE(result); - YQL_ENSURE(!targetColumns, "target columns for projection are not supported yet"); TExprNode::TPtr projectionLambda = BuildProjectionLambda(node->Pos(), result, subLinkId.Defined(), emitPgStar, ctx); TExprNode::TPtr projectionArg = projectionLambda->Head().HeadPtr(); TExprNode::TPtr projectionRoot = projectionLambda->TailPtr(); diff --git a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp index 07c29b4009..adeed910d2 100644 --- a/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_columnorder.cpp @@ -42,43 +42,36 @@ void AddPrefix(TVector<TString>& columnOrder, const TString& prefix) { IGraphTransformer::TStatus OrderForPgSetItem(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); TVector<TString> columnOrder; - if (auto targetColumnsOption = GetSetting(node->Tail(), "target_columns")) { - TExprNode::TPtr targetColumns = targetColumnsOption->Child(1); - for (const auto& targetColumn : targetColumns->ChildrenList()) { - columnOrder.emplace_back(targetColumn->Content()); - } - } else { - auto result = GetSetting(node->Tail(), "result"); - auto emitPgStar = GetSetting(node->Tail(), "emit_pg_star"); - if (result) { - for (size_t i = 0; i < result->Tail().ChildrenSize(); i++) { - auto col = result->Tail().Child(i); - if (col->Head().IsAtom()) { - auto alias = TString(col->Head().Content()); - YQL_ENSURE(!alias.empty()); - if (!emitPgStar) { - columnOrder.push_back(alias); - } - } - else { - YQL_ENSURE(col->Head().IsList()); - for (const auto& x : col->Head().Children()) { - auto alias = TString(x->Content()); - YQL_ENSURE(!alias.empty()); - columnOrder.push_back(alias); - } + auto result = GetSetting(node->Tail(), "result"); + auto emitPgStar = GetSetting(node->Tail(), "emit_pg_star"); + if (result) { + for (size_t i = 0; i < result->Tail().ChildrenSize(); i++) { + auto col = result->Tail().Child(i); + if (col->Head().IsAtom()) { + auto alias = TString(col->Head().Content()); + YQL_ENSURE(!alias.empty()); + if (!emitPgStar) { + columnOrder.push_back(alias); } } - } else { - auto values = GetSetting(node->Tail(), "values"); - YQL_ENSURE(values); - TExprNode::TPtr valuesList = values->Child(1); - for (size_t i = 0; i < valuesList->ChildrenSize(); i++) { - auto alias = TString(valuesList->Child(i)->Content()); - YQL_ENSURE(!alias.empty()); - columnOrder.push_back(alias); + else { + YQL_ENSURE(col->Head().IsList()); + for (const auto& x : col->Head().Children()) { + auto alias = TString(x->Content()); + YQL_ENSURE(!alias.empty()); + columnOrder.push_back(alias); + } } } + } else { + auto values = GetSetting(node->Tail(), "values"); + YQL_ENSURE(values); + TExprNode::TPtr valuesList = values->Child(1); + for (size_t i = 0; i < valuesList->ChildrenSize(); i++) { + auto alias = TString(valuesList->Child(i)->Content()); + YQL_ENSURE(!alias.empty()); + columnOrder.push_back(alias); + } } return ctx.Types.SetColumnOrder(*node, columnOrder, ctx.Expr); diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp index 9b4d8feaca..adbc5a8210 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp @@ -2732,7 +2732,6 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN bool hasUnknownsAllowed = false; TExprNode::TPtr groupExprs; TExprNode::TPtr result; - TExprNode::TPtr targetColumns; // pass 0 - from/values // pass 1 - join @@ -2772,7 +2771,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN "Incorrect fill_target_columns option")); return IGraphTransformer::TStatus::Error; } - } + } else if (optionName == "unknowns_allowed") { hasUnknownsAllowed = true; } @@ -2838,9 +2837,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN if (!EnsureAtom(*names->Child(i), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - TStringBuf columnName = targetColumns - ? targetColumns->Child(i)->Content() - : names->Child(i)->Content(); + TStringBuf columnName = names->Child(i)->Content(); outputItems.push_back(ctx.Expr.MakeType<TItemExprType>(columnName, tupleType->GetItems()[i])); } @@ -3038,9 +3035,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN } else { if (column->Head().IsAtom()) { - TStringBuf columnName = targetColumns - ? targetColumns->Child(index)->Content() - : column->Head().Content(); + TStringBuf columnName = column->Head().Content(); auto itemExpr = ctx.Expr.MakeType<TItemExprType>(columnName, column->Tail().GetTypeAnn()); if (hasEmitPgStar) { if (!outputItemIndex.contains(columnName)) { @@ -3053,15 +3048,8 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN } } else { // star or qualified star - size_t index = 0; for (const auto& item : column->Tail().GetTypeAnn()->Cast<TStructExprType>()->GetItems()) { auto itemRef = hasExtTypes ? item : RemoveAlias(item, ctx.Expr); - if (targetColumns) { - itemRef = ctx.Expr.MakeType<TItemExprType>( - targetColumns->Child(index++)->Content(), - itemRef->GetItemType() - ); - } if (hasEmitPgStar) { const auto& name = itemRef->GetName(); if (!outputItemIndex.contains(name)) { @@ -3983,33 +3971,6 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN if (!EnsureTupleSize(*option, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - - if (pass == 0) { - if (!EnsureTupleMinSize(option->Tail(), 1, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - for (const auto& child : option->Tail().Children()) { - if (!EnsureAtom(*child, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - } - targetColumns = &option->Tail(); - if (auto values = GetSetting(options, "values")) { - if (values->Child(1)->ChildrenSize() != targetColumns->ChildrenSize()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), - TStringBuilder() << "values and target_columns sizes do not match")); - return IGraphTransformer::TStatus::Error; - } - } - } - if (auto projectionOrder = GetSetting(options, "projection_order")) { - if (projectionOrder->ChildrenSize() != targetColumns->ChildrenSize()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), - TStringBuilder() << "projection_order and target_columns sizes do not match")); - return IGraphTransformer::TStatus::Error; - } - } } else { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(option->Head().Pos()), TStringBuilder() << "Unsupported option: " << optionName)); diff --git a/ydb/library/yql/core/yql_type_annotation.cpp b/ydb/library/yql/core/yql_type_annotation.cpp index 6e39b4ac6a..be16ffdd96 100644 --- a/ydb/library/yql/core/yql_type_annotation.cpp +++ b/ydb/library/yql/core/yql_type_annotation.cpp @@ -89,7 +89,7 @@ TMaybe<TColumnOrder> TTypeAnnotationContext::LookupColumnOrder(const TExprNode& } IGraphTransformer::TStatus TTypeAnnotationContext::SetColumnOrder(const TExprNode& node, - const TColumnOrder& columnOrder, TExprContext& ctx, bool overwrite) + const TColumnOrder& columnOrder, TExprContext& ctx) { if (!OrderedColumns) { return IGraphTransformer::TStatus::Ok; @@ -99,11 +99,9 @@ IGraphTransformer::TStatus TTypeAnnotationContext::SetColumnOrder(const TExprNod YQL_ENSURE(node.IsCallable()); if (auto existing = ColumnOrderStorage->Lookup(node.UniqueId())) { - if (!overwrite) { - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), - TStringBuilder() << "Column order " << FormatColumnOrder(existing) << " is already set for node " << node.Content())); - return IGraphTransformer::TStatus::Error; - } + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), + TStringBuilder() << "Column order " << FormatColumnOrder(existing) << " is already set for node " << node.Content())); + return IGraphTransformer::TStatus::Error; } auto nodeType = node.GetTypeAnn(); diff --git a/ydb/library/yql/core/yql_type_annotation.h b/ydb/library/yql/core/yql_type_annotation.h index 9683480517..369af2ee06 100644 --- a/ydb/library/yql/core/yql_type_annotation.h +++ b/ydb/library/yql/core/yql_type_annotation.h @@ -264,7 +264,7 @@ struct TTypeAnnotationContext: public TThrRefBase { TColumnOrderStorage::TPtr ColumnOrderStorage = new TColumnOrderStorage; TMaybe<TColumnOrder> LookupColumnOrder(const TExprNode& node) const; - IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx, bool overwrite = false); + IGraphTransformer::TStatus SetColumnOrder(const TExprNode& node, const TColumnOrder& columnOrder, TExprContext& ctx); // cached constants std::optional<ui64> CachedNow; diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index 78f4f29608..9b240dac43 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -1396,5 +1396,148 @@ bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx) return ValidateValueInDictionary(formatName, ctx, TimestampFormatNames); } +namespace { + bool MatchesSetItemOption(const TExprBase& setItemOption, TStringBuf name) { + if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 0) { + if (setItemOption.Ref().ChildPtr(0)->Content() == name) { + return true; + } + } + return false; + } +} //namespace + +bool TransformPgSetItemOption( + const TCoPgSelect& pgSelect, + TStringBuf optionName, + std::function<void(const TExprBase&)> lambda +) { + bool applied = false; + for (const auto& option : pgSelect.SelectOptions()) { + if (option.Name() == "set_items") { + auto pgSetItems = option.Value().Cast<TExprList>(); + for (const auto& setItem : pgSetItems) { + auto setItemNode = setItem.Cast<TCoPgSetItem>(); + for (const auto& setItemOption : setItemNode.SetItemOptions()) { + if (MatchesSetItemOption(setItemOption, optionName)) { + applied = true; + lambda(setItemOption); + } + } + } + } + } + return applied; +} + +TExprNode::TPtr GetSetItemOption(const TCoPgSelect& pgSelect, TStringBuf optionName) { + TExprNode::TPtr nodePtr = nullptr; + TransformPgSetItemOption(pgSelect, optionName, [&nodePtr](const TExprBase& option) { + nodePtr = option.Ptr(); + }); + return nodePtr; +} + +TExprNode::TPtr GetSetItemOptionValue(const TExprBase& setItemOption) { + if (setItemOption.Ref().IsList() && setItemOption.Ref().ChildrenSize() > 1) { + return setItemOption.Ref().ChildPtr(1); + } + return nullptr; +} + +bool NeedToRenamePgSelectColumns(const TCoPgSelect& pgSelect) { + auto fill = NCommon::GetSetItemOption(pgSelect, "fill_target_columns"); + return fill && !NCommon::GetSetItemOptionValue(TExprBase(fill)); +} + +bool RenamePgSelectColumns( + const TCoPgSelect& node, + TExprNode::TPtr& output, + TMaybe<TVector<TString>> tableColumnOrder, + TExprContext& ctx, + TTypeAnnotationContext& types) { + + bool hasValues = (bool)GetSetItemOption(node, "values"); + bool hasProjectionOrder = (bool)GetSetItemOption(node, "projection_order"); + Y_ENSURE(hasValues ^ hasProjectionOrder, "Only one of values and projection_order should be present"); + TString optionName = (hasValues) ? "values" : "projection_order"; + + auto selectorColumnOrder = types.LookupColumnOrder(node.Ref()); + TVector<TString> insertColumnOrder; + if (auto targetColumnsOption = GetSetItemOption(node, "target_columns")) { + auto targetColumns = GetSetItemOptionValue(TExprBase(targetColumnsOption)); + for (const auto& child : targetColumns->ChildrenList()) { + insertColumnOrder.emplace_back(child->Content()); + } + } else { + YQL_ENSURE(tableColumnOrder); + insertColumnOrder = *tableColumnOrder; + } + YQL_ENSURE(selectorColumnOrder); + if (selectorColumnOrder->size() > insertColumnOrder.size()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() << Sprintf( + "%s have %zu columns, INSERT INTO expects: %zu", + optionName.Data(), + selectorColumnOrder->size(), + insertColumnOrder.size() + ))); + return false; + } + + if (selectorColumnOrder == insertColumnOrder) { + output = node.Ptr(); + return true; + } + + TVector<const TItemExprType*> rowTypeItems; + rowTypeItems.reserve(selectorColumnOrder->size()); + const TTypeAnnotationNode* inputType; + switch (node.Ref().GetTypeAnn()->GetKind()) { + case ETypeAnnotationKind::List: + inputType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType(); + break; + default: + inputType = node.Ref().GetTypeAnn(); + break; + } + YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::Struct); + + const auto rowArg = Build<TCoArgument>(ctx, node.Pos()) + .Name("row") + .Done(); + auto structBuilder = Build<TCoAsStruct>(ctx, node.Pos()); + + for (size_t i = 0; i < selectorColumnOrder->size(); i++) { + const auto& columnName = selectorColumnOrder->at(i); + structBuilder.Add<TCoNameValueTuple>() + .Name().Build(insertColumnOrder.at(i)) + .Value<TCoMember>() + .Struct(rowArg) + .Name().Build(columnName) + .Build() + .Build(); + } + + auto fill = GetSetItemOption(node, "fill_target_columns"); + + output = Build<TCoMap>(ctx, node.Pos()) + .Input(node) + .Lambda<TCoLambda>() + .Args({rowArg}) + .Body(structBuilder.Done().Ptr()) + .Build() + .Done().Ptr(); + + fill->ChangeChildrenInplace({ + fill->Child(0), + Build<TCoAtom>(ctx, node.Pos()) + .Value("done") + .Done().Ptr() + }); + fill->ChildPtr(1)->SetTypeAnn(ctx.MakeType<TUnitExprType>()); + + return true; +} + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 4d0595ef46..9b86ff3f29 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -191,5 +191,23 @@ bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx); bool ValidateDateTimeFormatName(std::string_view formatName, TExprContext& ctx); bool ValidateTimestampFormatName(std::string_view formatName, TExprContext& ctx); +bool TransformPgSetItemOption( + const NNodes::TCoPgSelect& pgSelect, + TStringBuf optionName, + std::function<void(const NNodes::TExprBase&)> lambda +); + +TExprNode::TPtr GetSetItemOption(const NNodes::TCoPgSelect& pgSelect, TStringBuf optionName); + +TExprNode::TPtr GetSetItemOptionValue(const NNodes::TExprBase& setItemOption); + +bool NeedToRenamePgSelectColumns(const NNodes::TCoPgSelect& pgSelect); + +bool RenamePgSelectColumns( + const NNodes::TCoPgSelect& node, + TExprNode::TPtr& output, + TMaybe<TVector<TString>> tableColumnOrder, + TExprContext& ctx, + TTypeAnnotationContext& types); } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp index 371c58c36c..8c6ee6fbea 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_type_ann.cpp @@ -445,6 +445,31 @@ private: } } + TMaybe<TColumnOrder> contentColumnOrder; + if (content) { + contentColumnOrder = State_->Types->LookupColumnOrder(*content); + if (content->IsCallable("AssumeColumnOrder")) { + YQL_ENSURE(contentColumnOrder); + YQL_CLOG(INFO, ProviderYt) << "Dropping top level " << content->Content() << " from WriteTable input"; + content = content->HeadPtr(); + } + } + + if (content && TCoPgSelect::Match(content.Get())) { + auto pgSelect = TCoPgSelect(content); + if (NCommon::NeedToRenamePgSelectColumns(pgSelect)) { + TExprNode::TPtr output; + bool result = NCommon::RenamePgSelectColumns(pgSelect, output, contentColumnOrder, ctx, *State_->Types); + if (!result) { + return TStatus::Error; + } + if (output != content) { + content = output; + return TStatus::Repeat; + } + } + } + if (checkLayout) { auto rowSpec = description.RowSpec; TString modeStr = EYtWriteMode::RenewKeepMeta == mode ? "truncate with keep meta" : ToString(mode); @@ -496,16 +521,6 @@ private: } } - TMaybe<TColumnOrder> contentColumnOrder; - if (content) { - contentColumnOrder = State_->Types->LookupColumnOrder(*content); - if (content->IsCallable("AssumeColumnOrder")) { - YQL_ENSURE(contentColumnOrder); - YQL_CLOG(INFO, ProviderYt) << "Dropping top level " << content->Content() << " from WriteTable input"; - content = content->HeadPtr(); - } - } - if (auto commitEpoch = outTableInfo.CommitEpoch.GetOrElse(0)) { TYtTableDescription& nextDescription = State_->TablesData->GetOrAddTable(cluster, outTableInfo.Name, commitEpoch); diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index a8cfb97160..6423ffec9e 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -991,7 +991,8 @@ public: } if (!targetColumns.empty()) { setItemOptions.push_back(QL(QA("target_columns"), QVL(targetColumns.data(), targetColumns.size()))); - } else if (fillTargetColumns) { + } + if (fillTargetColumns) { setItemOptions.push_back(QL(QA("fill_target_columns"))); } if (ListLength(x->targetList) > 0) { |