diff options
| author | Vitalii Gridnev <[email protected]> | 2024-01-11 17:57:58 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-11 17:57:58 +0300 |
| commit | e67863791f9b920db900dbf300c5abaee981b131 (patch) | |
| tree | 74ac77fae55dedc7a4594de60cbb5824354f8ccc | |
| parent | 53aa305accd90893de5a617df796bef4c2a74fdb (diff) | |
fix upsert overwriting non-default values (#926)
| -rw-r--r-- | ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 10 | ||||
| -rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/kqp_opt_kql.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h | 3 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp | 235 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/physical/effects/ya.make | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 21 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp | 109 |
10 files changed, 355 insertions, 54 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 113b59a5f5d..df0a9834dfd 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -262,6 +262,16 @@ ] }, { + "Name": "TKqlInsertOnConflictUpdateRows", + "Base": "TKqlUpsertRowsBase", + "Match": {"Type": "Callable", "Name": "KqlInsertOnConflictUpdateRows"}, + "Children": [ + {"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "GenerateColumnsIfInsert", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] + }, + { "Name": "TKqlUpsertRowsIndex", "Base": "TKqlUpsertRowsBase", "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"}, diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index a45387d6473..59162574e04 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -553,7 +553,12 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const itemType = input->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(); isStream = true; } else { - YQL_ENSURE(TKqlUpsertRows::Match(node.Get()) || TKqlUpsertRowsIndex::Match(node.Get())); + + YQL_ENSURE( + TKqlUpsertRows::Match(node.Get()) || + TKqlUpsertRowsIndex::Match(node.Get()) || + TKqlInsertOnConflictUpdateRows::Match(node.Get()) + ); if (!EnsureListType(*input, ctx)) { return TStatus::Error; diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index c3a26241209..50a02406d19 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -283,7 +283,21 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC const TCoAtomList& autoincrement, const TKikimrTableDescription& table, TExprContext& ctx) { + auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert"); + YQL_ENSURE(generateColumnsIfInsertNode); + TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast<TCoAtomList>(); + const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); + if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) { + return Build<TKqlInsertOnConflictUpdateRows>(ctx, write.Pos()) + .Table(BuildTableMeta(table, write.Pos(), ctx)) + .Input(input.Ptr()) + .Columns(columns.Ptr()) + .ReturningColumns(write.ReturningColumns()) + .GenerateColumnsIfInsert(generateColumnsIfInsert) + .Done(); + } + auto effect = Build<TKqlUpsertRows>(ctx, write.Pos()) .Table(BuildTableMeta(table, write.Pos(), ctx)) .Input(input.Ptr()) diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h index 5ed944c27ea..d35c4718d09 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_rules.h @@ -12,6 +12,9 @@ NYql::NNodes::TExprBase KqpBuildReturning(NYql::NNodes::TExprBase node, NYql::TE NYql::NNodes::TExprBase KqpRewriteReturningUpsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRewriteGenerateIfInsert(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpBuildUpdateStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp new file mode 100644 index 00000000000..48e25ba5301 --- /dev/null +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp @@ -0,0 +1,235 @@ +#include "kqp_opt_phy_effects_rules.h" +#include "kqp_opt_phy_effects_impl.h" + +using namespace NYql; +using namespace NYql::NNodes; + +namespace NKikimr::NKqp::NOpt { + +TMaybeNode<TDqPhyPrecompute> PrecomputeCurrentDefaultsForKeys(const TDqPhyPrecompute& lookupKeys, + const TCoAtomList& columnsWithDefault, + const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) +{ + TVector<TExprBase> lookupColumns; + + for(const auto& key: table.Metadata->KeyColumnNames) { + auto atom = Build<TCoAtom>(ctx, pos) + .Value(key) + .Done(); + + lookupColumns.emplace_back(std::move(atom)); + } + + for(const auto& atom: columnsWithDefault) { + lookupColumns.push_back(atom); + } + + auto lookupColumnsList = Build<TCoAtomList>(ctx, pos) + .Add(lookupColumns) + .Done(); + + auto lookupStage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(lookupKeys) + .Build() + .Program() + .Args({"keys_list"}) + .Body<TKqpLookupTable>() + .Table(BuildTableMeta(table, pos, ctx)) + .LookupKeys<TCoIterator>() + .List("keys_list") + .Build() + .Columns(lookupColumnsList) + .Build() + .Build() + .Settings().Build() + .Done(); + + auto lookup = Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(lookupStage) + .Index().Build("0") + .Build() + .Done(); + + auto lookupPayloadSelector = MakeRowsPayloadSelector(lookupColumnsList, table, lookupKeys.Pos(), ctx); + auto condenseLookupResult = CondenseInputToDictByPk(lookup, table, lookupPayloadSelector, ctx); + if (!condenseLookupResult) { + return {}; + } + + auto computeDictStage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(condenseLookupResult->StageInputs) + .Build() + .Program() + .Args(condenseLookupResult->StageArgs) + .Body(condenseLookupResult->Stream) + .Build() + .Settings().Build() + .Done(); + + return Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeDictStage) + .Index().Build("0") + .Build() + .Build() + .Done(); +} + +TCoAtomList BuildNonDefaultColumns( + const TKikimrTableDescription& table, + const TCoAtomList& allColumns, + const TCoAtomList& columnsWithDefault, + TPositionHandle pos, TExprContext& ctx) +{ + TVector<TExprBase> columnsToUpdateSet; + std::unordered_set<TString> unchangedColumns; + + for(const auto& column: columnsWithDefault) { + unchangedColumns.emplace(TString(column)); + } + + for(const TString& key: table.Metadata->KeyColumnNames) { + unchangedColumns.emplace(key); + } + + for (const auto& column : allColumns) { + auto colName = TString(column); + auto it = unchangedColumns.find(colName); + if (it != unchangedColumns.end()) { + continue; + } + + auto atom = Build<TCoAtom>(ctx, pos) + .Value(colName) + .Done(); + + columnsToUpdateSet.emplace_back(std::move(atom)); + } + + return Build<TCoAtomList>(ctx, pos) + .Add(columnsToUpdateSet) + .Done(); +} + +TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>(); + if (!maybeInsertOnConlictUpdate) { + return node; + } + + auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast(); + YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0); + TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert(); + + auto input = insertOnConlictUpdate.Input(); + auto pos = insertOnConlictUpdate.Input().Pos(); + + const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path()); + + auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx); + auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx); + if (!condenseResult) { + return node; + } + + auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx); + auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx); + if (!lookupDict) { + return node; + } + + auto nonDefaultColumns = BuildNonDefaultColumns(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, pos, ctx); + + auto inputKeysArg = TCoArgument(ctx.NewArgument(pos, "input_keys")); + auto inputDictArg = TCoArgument(ctx.NewArgument(pos, "input_dict")); + auto inputKeyArg = TCoArgument(ctx.NewArgument(pos, "input_key")); + auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "lookup_dict")); + auto presetHandlerPayload = TCoArgument(ctx.NewArgument(pos, "payload")); + + auto filterStage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(inputDictAndKeys.KeysPrecompute) + .Add(inputDictAndKeys.DictPrecompute) + .Add(lookupDict.Cast()) + .Build() + .Program() + .Args({inputKeysArg, inputDictArg, lookupDictArg}) + .Body<TCoIterator>() + .List<TCoMap>() + .Input(inputKeysArg) + .Lambda() + .Args(inputKeyArg) + .Body<TCoIfPresent>() + .Optional<TCoLookup>() + .Collection(lookupDictArg) + .Lookup(inputKeyArg) + .Build() + .PresentHandler<TCoLambda>() + .Args(presetHandlerPayload) + .Body<TCoFlattenMembers>() + .Add() + .Name().Build("") + .Value(presetHandlerPayload) + .Build() + .Add() + .Name().Build("") + .Value<TCoUnwrap>() + .Optional<TCoExtractMembers>() + .Input<TCoLookup>() + .Collection(inputDictArg) + .Lookup(inputKeyArg) + .Build() + .Members(nonDefaultColumns) + .Build() + .Build() + .Build() + .Add() + .Name().Build("") + .Value(inputKeyArg) + .Build() + .Build() + .Build() + .MissingValue<TCoFlattenMembers>() + .Add() + .Name().Build("") + .Value<TCoUnwrap>() + .Optional<TCoLookup>() + .Collection(inputDictArg) + .Lookup(inputKeyArg) + .Build() + .Build() + .Build() + .Add() + .Name().Build("") + .Value(inputKeyArg) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + auto newInput = Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(filterStage) + .Index().Build("0") + .Build() + .Done(); + + return Build<TKqlUpsertRows>(ctx, insertOnConlictUpdate.Pos()) + .Input(newInput.Ptr()) + .Table(insertOnConlictUpdate.Table()) + .Columns(insertOnConlictUpdate.Columns()) + .Settings(insertOnConlictUpdate.Settings()) + .ReturningColumns(insertOnConlictUpdate.ReturningColumns()) + .Done(); +} + +} // namespace NKikimr::NKqp::NOpt
\ No newline at end of file diff --git a/ydb/core/kqp/opt/physical/effects/ya.make b/ydb/core/kqp/opt/physical/effects/ya.make index 68e6e7482f0..f0c0f0e0d32 100644 --- a/ydb/core/kqp/opt/physical/effects/ya.make +++ b/ydb/core/kqp/opt/physical/effects/ya.make @@ -11,6 +11,7 @@ SRCS( kqp_opt_phy_update.cpp kqp_opt_phy_upsert_index.cpp kqp_opt_phy_returning.cpp + kqp_opt_phy_upsert_defaults.cpp ) PEERDIR( diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 8167c175950..330bcd400f9 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -68,6 +68,7 @@ public: AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TKqlInsertRows::Match, HNDL(BuildInsertStages)); AddHandler(0, &TKqlUpdateRows::Match, HNDL(BuildUpdateStages)); + AddHandler(0, &TKqlInsertOnConflictUpdateRows::Match, HNDL(RewriteGenerateIfInsert)); AddHandler(0, &TKqlUpdateRowsIndex::Match, HNDL(BuildUpdateIndexStages)); AddHandler(0, &TKqlUpsertRowsIndex::Match, HNDL(BuildUpsertIndexStages)); AddHandler(0, &TKqlInsertRowsIndex::Match, HNDL(BuildInsertIndexStages)); @@ -144,6 +145,12 @@ protected: return output; } + TMaybeNode<TExprBase> RewriteGenerateIfInsert(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteGenerateIfInsert(node, ctx, KqpCtx); + DumpAppliedRule("RewriteGenerateIfInsert", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> BuildReadTableStage(TExprBase node, TExprContext& ctx) { TExprBase output = KqpBuildReadTableStage(node, ctx, KqpCtx); DumpAppliedRule("BuildReadTableStage", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 14d28479dac..2735e8a12ff 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1095,7 +1095,7 @@ public: return SyncError(); } else if (constraint.Name().Value() == "default") { auto columnBuild = indexBuildSettings.mutable_column_build_operation()->add_column(); - columnBuild->SetColumnName(TString(constraint.Name().Value())); + columnBuild->SetColumnName(TString(columnName)); FillLiteralProto(constraint.Value().Cast<TCoDataCtor>(), *columnBuild->mutable_default_from_literal()); } } diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index abacd671c75..67c4396f949 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -415,6 +415,8 @@ private: defaultConstraintColumnsSet.emplace(keyColumnName); } + THashSet<TString> generateColumnsIfInsertColumnsSet; + for(const auto& [name, info] : table->Metadata->Columns) { if (rowType->FindItem(name)) { continue; @@ -424,7 +426,15 @@ private: continue; } + if (defaultConstraintColumnsSet.find(name) != defaultConstraintColumnsSet.end()) { + continue; + } + if (info.IsDefaultKindDefined()) { + if (op == TYdbOperation::Upsert) { + generateColumnsIfInsertColumnsSet.emplace(name); + } + defaultConstraintColumnsSet.emplace(name); } } @@ -485,6 +495,11 @@ private: defaultConstraintColumns.push_back(ctx.NewAtom(node.Pos(), generatedColumn)); } + TExprNode::TListType generateColumnsIfInsert; + for(auto& generatedColumn: generateColumnsIfInsertColumnsSet) { + generateColumnsIfInsert.push_back(ctx.NewAtom(node.Pos(), generatedColumn)); + } + node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build<TCoNameValueTupleList>(ctx, node.Pos()) .Add(node.Settings()) .Add() @@ -499,6 +514,12 @@ private: .Add(defaultConstraintColumns) .Build() .Build() + .Add() + .Name().Build("generate_columns_if_insert") + .Value<TCoAtomList>() + .Add(generateColumnsIfInsert) + .Build() + .Build() .Done() .Ptr(); diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 5a8e89b7543..249a8a205d4 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -491,11 +491,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { result.GetIssues().ToString()); } - { - TString query = R"( - UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); - )"; - + auto fQuery = [&](TString query) -> TString { NYdb::NTable::TExecDataQuerySettings execSettings; execSettings.KeepInQueryCache(true); execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); @@ -508,12 +504,32 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } + if (result.GetResultSets().size() > 0) + return NYdb::FormatResultSetYson(result.GetResultSet(0)); + return ""; + }; + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); + )"); + + auto fCompareTable = [&](TString expected) { + TString query = R"( + SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; + )"; + CompareYson(expected, fQuery(query)); + }; + + fCompareTable(R"( + [ + [[1u];["Old"]] + ] + )"); { auto query = R"( --!syntax_v1 - ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Exists Int32 NOT NULL DEFAULT 1; + ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Value2 Int32 NOT NULL DEFAULT 1; )"; auto result = session.ExecuteSchemeQuery(query).GetValueSync(); @@ -523,51 +539,40 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { Sleep(TDuration::Seconds(3)); - { - TString query = R"( - INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); - )"; - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.KeepInQueryCache(true); - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = - session - .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), - execSettings) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, - result.GetIssues().ToString()); - } - - { - TString query = R"( - SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; - )"; - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.KeepInQueryCache(true); - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = - session - .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), - execSettings) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, - result.GetIssues().ToString()); - - Cerr << NYdb::FormatResultSetYson(result.GetResultSet(0)) << Endl; - CompareYson(R"( - [ - [[1u];["Old"];[1]];[[2u];["New"];[1]] - ] - )", - NYdb::FormatResultSetYson(result.GetResultSet(0))); - } + fQuery(R"( + INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["New"];[1]] + ] + )"); + + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value, Value2) VALUES (2, "New", 2); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["New"];[2]] + ] + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "OldNew"); + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (3, "BrandNew"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];[1]];[[2u];["OldNew"];[2]];[[3u];["BrandNew"];[1]] + ] + )"); } |
