diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2024-01-16 18:08:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-16 18:08:12 +0300 |
commit | 07c49db084817e18fd41123d3fa71e74229da8a8 (patch) | |
tree | b47553886ce7f9739e2489fbc3b884ef9becbe88 | |
parent | 24e4490ee65ed54e0c2c12317e424275664cf493 (diff) | |
download | ydb-07c49db084817e18fd41123d3fa71e74229da8a8.tar.gz |
rewrite upsert for columns with defaults (#1035)
7 files changed, 213 insertions, 118 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 337746019d..970c0d6589 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -277,7 +277,8 @@ "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"}, "Children": [ {"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 4, "Name": "GenerateColumnsIfInsert", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] }, { diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 50a02406d1..90db2d10cc 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -313,11 +313,16 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis const TKikimrTableDescription& table, TExprContext& ctx) { const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); + auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert"); + YQL_ENSURE(generateColumnsIfInsertNode); + TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast<TCoAtomList>(); + auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos()) .Table(BuildTableMeta(table, write.Pos(), ctx)) .Input(input.Ptr()) .Columns(columns.Ptr()) .ReturningColumns(write.ReturningColumns()) + .GenerateColumnsIfInsert(generateColumnsIfInsert) .Done(); return effect; @@ -348,6 +353,7 @@ TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomLi .Input(input.Ptr()) .Columns(columns.Ptr()) .ReturningColumns(write.ReturningColumns()) + .GenerateColumnsIfInsert<TCoAtomList>().Build() .Done(); return effect; diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index e2be88de4c..3765e156ec 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -80,6 +80,7 @@ NYql::NNodes::TMaybeNode<NYql::NNodes::TExprList> KqpPhyUpsertIndexEffectsImpl(T const NYql::NNodes::TExprBase& inputRows, const NYql::NNodes::TCoAtomList& inputColumns, const NYql::NNodes::TCoAtomList& returningColumns, + const NYql::NNodes::TCoAtomList& columnsWithDefaults, const NYql::TKikimrTableDescription& table, const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, NYql::TPositionHandle pos, NYql::TExprContext& ctx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp index fb5b6046a4..e00577ec7c 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp @@ -15,8 +15,10 @@ TExprBase KqpBuildUpdateIndexStages(TExprBase node, TExprContext& ctx, const TKq auto update = node.Cast<TKqlUpdateRowsIndex>(); const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, update.Table().Path()); + TCoAtomList empty = Build<TCoAtomList>(ctx, node.Pos()).Done(); + auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::UpdateOn, update.Input(), - update.Columns(), update.ReturningColumns(), table, update.Settings(), update.Pos(), ctx); + update.Columns(), update.ReturningColumns(), empty, table, update.Settings(), update.Pos(), ctx); if (!effects) { return node; diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp index 48e25ba530..d2f890cb3d 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_defaults.cpp @@ -24,59 +24,8 @@ TMaybeNode<TDqPhyPrecompute> PrecomputeCurrentDefaultsForKeys(const TDqPhyPrecom lookupColumns.push_back(atom); } - auto lookupColumnsList = Build<TCoAtomList>(ctx, pos) - .Add(lookupColumns) - .Done(); - - auto lookupStage = Build<TDqStage>(ctx, pos) - .Inputs() - .Add(lookupKeys) - .Build() - .Program() - .Args({"keys_list"}) - .Body<TKqpLookupTable>() - .Table(BuildTableMeta(table, pos, ctx)) - .LookupKeys<TCoIterator>() - .List("keys_list") - .Build() - .Columns(lookupColumnsList) - .Build() - .Build() - .Settings().Build() - .Done(); - - auto lookup = Build<TDqCnUnionAll>(ctx, pos) - .Output() - .Stage(lookupStage) - .Index().Build("0") - .Build() - .Done(); - - auto lookupPayloadSelector = MakeRowsPayloadSelector(lookupColumnsList, table, lookupKeys.Pos(), ctx); - auto condenseLookupResult = CondenseInputToDictByPk(lookup, table, lookupPayloadSelector, ctx); - if (!condenseLookupResult) { - return {}; - } - - auto computeDictStage = Build<TDqStage>(ctx, pos) - .Inputs() - .Add(condenseLookupResult->StageInputs) - .Build() - .Program() - .Args(condenseLookupResult->StageArgs) - .Body(condenseLookupResult->Stream) - .Build() - .Settings().Build() - .Done(); - - return Build<TDqPhyPrecompute>(ctx, pos) - .Connection<TDqCnValue>() - .Output() - .Stage(computeDictStage) - .Index().Build("0") - .Build() - .Build() - .Done(); + return PrecomputeTableLookupDict( + lookupKeys, table, lookupColumns, pos, ctx, false); } TCoAtomList BuildNonDefaultColumns( @@ -115,34 +64,15 @@ TCoAtomList BuildNonDefaultColumns( .Done(); } -TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>(); - if (!maybeInsertOnConlictUpdate) { - return node; - } - - auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast(); - YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0); - TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert(); - - auto input = insertOnConlictUpdate.Input(); - auto pos = insertOnConlictUpdate.Input().Pos(); - - const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path()); - - auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx); - auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx); - if (!condenseResult) { - return node; - } - - auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx); - auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx); - if (!lookupDict) { - return node; - } - - auto nonDefaultColumns = BuildNonDefaultColumns(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, pos, ctx); +TDqStage BuildInsertOnConflictUpdateInputStage( + const TKikimrTableDescription& table, + const TCoAtomList& upsertColumns, + const TCoAtomList& columnsWithDefault, + const TDictAndKeysResult& inputDictAndKeys, + const TDqPhyPrecompute& lookupDict, + TPositionHandle pos, TExprContext& ctx) +{ + auto nonDefaultColumns = BuildNonDefaultColumns(table, upsertColumns, columnsWithDefault, pos, ctx); auto inputKeysArg = TCoArgument(ctx.NewArgument(pos, "input_keys")); auto inputDictArg = TCoArgument(ctx.NewArgument(pos, "input_dict")); @@ -150,11 +80,11 @@ TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TK auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "lookup_dict")); auto presetHandlerPayload = TCoArgument(ctx.NewArgument(pos, "payload")); - auto filterStage = Build<TDqStage>(ctx, pos) + return Build<TDqStage>(ctx, pos) .Inputs() .Add(inputDictAndKeys.KeysPrecompute) .Add(inputDictAndKeys.DictPrecompute) - .Add(lookupDict.Cast()) + .Add(lookupDict) .Build() .Program() .Args({inputKeysArg, inputDictArg, lookupDictArg}) @@ -215,10 +145,38 @@ TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TK .Build() .Settings().Build() .Done(); +} + +TExprBase KqpRewriteGenerateIfInsert(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + auto maybeInsertOnConlictUpdate = node.Maybe<TKqlInsertOnConflictUpdateRows>(); + if (!maybeInsertOnConlictUpdate) { + return node; + } + + auto insertOnConlictUpdate = maybeInsertOnConlictUpdate.Cast(); + YQL_ENSURE(insertOnConlictUpdate.GenerateColumnsIfInsert().Ref().ChildrenSize() > 0); + TCoAtomList columnsWithDefault = insertOnConlictUpdate.GenerateColumnsIfInsert(); + + auto input = insertOnConlictUpdate.Input(); + auto pos = insertOnConlictUpdate.Input().Pos(); + + const auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, insertOnConlictUpdate.Table().Path()); + + auto payloadSelector = MakeRowsPayloadSelector(insertOnConlictUpdate.Columns(), tableDesc, pos, ctx); + auto condenseResult = CondenseInputToDictByPk(input, tableDesc, payloadSelector, ctx); + if (!condenseResult) { + return node; + } + + auto inputDictAndKeys = PrecomputeDictAndKeys(*condenseResult, pos, ctx); + auto lookupDict = PrecomputeCurrentDefaultsForKeys(inputDictAndKeys.KeysPrecompute, columnsWithDefault, tableDesc, pos, ctx); + if (!lookupDict) { + return node; + } auto newInput = Build<TDqCnUnionAll>(ctx, pos) .Output() - .Stage(filterStage) + .Stage(BuildInsertOnConflictUpdateInputStage(tableDesc, insertOnConlictUpdate.Columns(), columnsWithDefault, inputDictAndKeys, lookupDict.Cast(), pos, ctx)) .Index().Build("0") .Build() .Done(); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index c9e11842f4..c868d065ce 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -344,7 +344,8 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput .Done(); } -TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns, +TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns, + const THashSet<TString>& checkDefaults, const TKikimrTableDescription& table, const TSecondaryIndexes& indexes, TPositionHandle pos, TExprContext& ctx) { @@ -379,28 +380,37 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c } } - if (missedKeyInput && hasUniqIndex) { + if (!hasUniqIndex) { + missedKeyInput.clear(); + } + + if (!missedKeyInput.empty() || !checkDefaults.empty()) { TVector<TExprBase> columns; TCoArgument inLambdaArg(ctx.NewArgument(pos, "in_lambda_arg")); - auto missedFromMain = TCoArgument(ctx.NewArgument(pos, "missed_from_main")); + auto lookupRowArgument = TCoArgument(ctx.NewArgument(pos, "lookup_row")); - TVector<TExprBase> resCol; + TVector<TExprBase> existingRow, nonExistingRow; + auto getterFromValue = [&ctx, &pos] (const TStringBuf& x, const TCoArgument& arg) -> TExprBase { + return Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(x) + .Value<TCoMember>() + .Struct(arg) + .Name().Build(x) + .Build() + .Done(); + }; for (const auto& x : inputColumns) { - if (!missedKeyInput.contains(x)) { - resCol.emplace_back( - Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(x) - .Value<TCoMember>() - .Struct(inLambdaArg) - .Name().Build(x) - .Build() - .Done()); + bool hasDefaultToCheck = checkDefaults.contains(x); + if (hasDefaultToCheck) { + existingRow.push_back(getterFromValue(x, lookupRowArgument)); + } else { + existingRow.emplace_back(getterFromValue(x, inLambdaArg)); } - } - TVector<TExprBase> resNullCol = resCol; + nonExistingRow.push_back(getterFromValue(x, inLambdaArg)); + } for (const auto& x : missedKeyInput) { auto atom = Build<TCoAtom>(ctx, pos) @@ -411,16 +421,8 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c auto columnType = table.GetColumnType(TString(x)); YQL_ENSURE(columnType); - resCol.emplace_back( - Build<TCoNameValueTuple>(ctx, pos) - .Name().Build(x) - .Value<TCoMember>() - .Struct(missedFromMain) - .Name().Build(x) - .Build() - .Done()); - - resNullCol.emplace_back( + existingRow.emplace_back(getterFromValue(x, lookupRowArgument)); + nonExistingRow.emplace_back( Build<TCoNameValueTuple>(ctx, pos) .Name().Build(x) .Value<TCoNothing>() @@ -436,8 +438,11 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c columns.emplace_back(atom); } - auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx); + for(const auto& x: checkDefaults) { + columns.push_back(Build<TCoAtom>(ctx, pos).Value(x).Done()); + } + auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx); auto precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true); TVector<TExprBase> keyLookupTuples; @@ -475,13 +480,13 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c .Build() .Build() .PresentHandler<TCoLambda>() - .Args({missedFromMain}) + .Args({lookupRowArgument}) .Body<TCoAsStruct>() - .Add(resCol) + .Add(existingRow) .Build() .Build() .MissingValue<TCoAsStruct>() - .Add(resNullCol) + .Add(nonExistingRow) .Build() .Build() .Build() @@ -595,7 +600,7 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c } // namespace TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows, - const TCoAtomList& inputColumns, const TCoAtomList& returningColumns, const TKikimrTableDescription& table, + const TCoAtomList& inputColumns, const TCoAtomList& returningColumns, const TCoAtomList& columnsWithDefaults, const TKikimrTableDescription& table, const TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, TPositionHandle pos, TExprContext& ctx) { switch (mode) { @@ -614,10 +619,15 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, inputColumnsSet.emplace(column.Value()); } + THashSet<TString> columnsWithDefaultsSet; + for(const auto& column: columnsWithDefaults) { + columnsWithDefaultsSet.emplace(column.Value()); + } + auto filter = (mode == TKqpPhyUpsertIndexMode::UpdateOn) ? &inputColumnsSet : nullptr; const auto indexes = BuildSecondaryIndexVector(table, pos, ctx, filter); - auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, table, indexes, pos, ctx); + auto checkedInput = RewriteInputForConstraint(inputRows, inputColumnsSet, columnsWithDefaultsSet, table, indexes, pos, ctx); if (!checkedInput) { return {}; @@ -901,7 +911,7 @@ TExprBase KqpBuildUpsertIndexStages(TExprBase node, TExprContext& ctx, const TKq const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, upsert.Table().Path()); auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::Upsert, upsert.Input(), upsert.Columns(), - upsert.ReturningColumns(), table, upsert.Settings(), upsert.Pos(), ctx); + upsert.ReturningColumns(), upsert.GenerateColumnsIfInsert(), table, upsert.Settings(), upsert.Pos(), ctx); if (!effects) { return node; diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 3e0cab5eb0..b954da884d 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -465,6 +465,123 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { } } + Y_UNIT_TEST(AlterTableAddNotNullWithDefaultIndexed) { + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(false); + appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true); + + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()).SetAppConfig(appConfig)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/AlterTableAddNotNullColumn` ( + Key Uint32, + Value String, + Value2 Int32 NOT NULL DEFAULT 1, + PRIMARY KEY (Key), + INDEX ByValue GLOBAL ON (Value) COVER (Value2) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + } + + auto fQuery = [&](TString query) -> TString { + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto result = + session + .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), + execSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + if (result.GetResultSets().size() > 0) + return NYdb::FormatResultSetYson(result.GetResultSet(0)); + return ""; + }; + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); + )"); + + auto fCompareTable = [&](TString expected) { + TString query = R"( + SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; + )"; + CompareYson(expected, fQuery(query)); + }; + + fCompareTable(R"( + [ + [[1u];["Old"];1] + ] + )"); + + fQuery(R"( + INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];1];[[2u];["New"];1] + ] + )"); + + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value, Value2) VALUES (2, "New", 2); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];1];[[2u];["New"];2] + ] + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "OldNew"); + )"); + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (3, "BrandNew"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];1];[[2u];["OldNew"];2];[[3u];["BrandNew"];1] + ] + )"); + + CompareYson( + fQuery("SELECT Value, Value2 FROM `/Root/AlterTableAddNotNullColumn` VIEW ByValue WHERE Value = \"OldNew\""), + R"( + [ + [["OldNew"];2] + ] + )" + ); + + CompareYson( + fQuery("SELECT Value, Value2 FROM `/Root/AlterTableAddNotNullColumn` VIEW ByValue WHERE Value = \"BrandNew\""), + R"( + [ + [["BrandNew"];1] + ] + )" + ); + + } + Y_UNIT_TEST(AlterTableAddNotNullWithDefault) { NKikimrConfig::TAppConfig appConfig; |