diff options
author | qrort <[email protected]> | 2023-11-02 16:12:13 +0300 |
---|---|---|
committer | qrort <[email protected]> | 2023-11-02 18:03:09 +0300 |
commit | caedc26b8b82943b93d408a0a566e6d795967fd9 (patch) | |
tree | 9f08a5901f6158cfceb0b439e60295d6543ef89a | |
parent | 5d953bb776b195f7d4709e425ba9646b2ca866b2 (diff) |
KIKIMR-19377: do not require not null columns in upsert for UPDATE TABLE
17 files changed, 544 insertions, 55 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index b7cf36d3a84..fc3f73eb261 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -267,15 +267,18 @@ void TKqpReadTableSettings::AddSkipNullKey(const TString& key) { SkipNullKeys.emplace_back(key); } -TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TKqpUpsertRows& node) { +TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TCoNameValueTupleList& settingsList) { TKqpUpsertRowsSettings settings; - for (const auto& tuple : node.Settings()) { + for (const auto& tuple : settingsList) { TStringBuf name = tuple.Name().Value(); - + if (name == TKqpUpsertRowsSettings::InplaceSettingName) { YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); settings.Inplace = true; + } else if (name == TKqpUpsertRowsSettings::IsUpdateSettingName) { + YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); + settings.IsUpdate = true; } else { YQL_ENSURE(false, "Unknown KqpUpsertRows setting name '" << name << "'"); } @@ -284,6 +287,10 @@ TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TKqpUpsertRows& node) return settings; } +TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const NNodes::TKqpUpsertRows& node) { + return TKqpUpsertRowsSettings::Parse(node.Settings()); +} + NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { TVector<TCoNameValueTuple> settings; settings.reserve(1); @@ -294,6 +301,12 @@ NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ct .Name().Build(InplaceSettingName) .Done()); } + if (IsUpdate) { + settings.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(IsUpdateSettingName) + .Done()); + } return Build<TCoNameValueTupleList>(ctx, pos) .Add(settings) diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 3aab7b5d085..9f02ae325f9 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -78,11 +78,15 @@ struct TKqpReadTableSettings { struct TKqpUpsertRowsSettings { static constexpr TStringBuf InplaceSettingName = "Inplace"; + static constexpr TStringBuf IsUpdateSettingName = "IsUpdate"; bool Inplace = false; + bool IsUpdate = false; void SetInplace() { Inplace = true; } + void SetIsUpdate() { IsUpdate = true; } + static TKqpUpsertRowsSettings Parse(const NNodes::TCoNameValueTupleList& settingsList); static TKqpUpsertRowsSettings Parse(const NNodes::TKqpUpsertRows& node); NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; }; diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 09413f17157..3cb2588ccf5 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -245,12 +245,18 @@ { "Name": "TKqlUpsertRows", "Base": "TKqlUpsertRowsBase", - "Match": {"Type": "Callable", "Name": "KqlUpsertRows"} + "Match": {"Type": "Callable", "Name": "KqlUpsertRows"}, + "Children": [ + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] }, { "Name": "TKqlUpsertRowsIndex", "Base": "TKqlUpsertRowsBase", - "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"} + "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"}, + "Children": [ + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] }, { "Name": "TKqpUpsertRows", @@ -296,7 +302,10 @@ { "Name": "TKqlUpdateRowsIndex", "Base": "TKqlUpdateRowsBase", - "Match": {"Type": "Callable", "Name": "TKqlUpdateRowsIndex"} + "Match": {"Type": "Callable", "Name": "TKqlUpdateRowsIndex"}, + "Children": [ + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] }, { "Name": "TKqlInsertRows", diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 57c6677cb07..b4bdd2612a7 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -554,22 +554,38 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const } } - for (auto& [name, meta] : table.second->Metadata->Columns) { - if (meta.NotNull && !rowType->FindItem(name)) { - ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE, TStringBuilder() - << "Missing not null column in input: " << name - << ". All not null columns should be initialized")); - return TStatus::Error; - } - - - if (meta.NotNull && rowType->FindItemType(name)->HasOptionalOrNull()) { - if (rowType->FindItemType(name)->GetKind() != ETypeAnnotationKind::Pg) { - ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder() - << "Can't set optional or NULL value to not null column: " << name + TMaybeNode<TCoNameValueTupleList> settings; + TKqpUpsertRowsSettings upsertSettings; + if (TKqlUpsertRows::Match(node.Get()) && node->ChildrenSize() > TKqlUpsertRows::idx_Settings) { + settings = node->ChildPtr(TKqlUpsertRows::idx_Settings); + } + if (TKqlUpsertRowsIndex::Match(node.Get()) && node->ChildrenSize() > TKqlUpsertRowsIndex::idx_Settings) { + settings = node->ChildPtr(TKqlUpsertRowsIndex::idx_Settings); + } + if (TKqpUpsertRows::Match(node.Get())) /* here settings are not optional*/ { + settings = node->ChildPtr(TKqpUpsertRows::idx_Settings); + } + if (settings) { + upsertSettings = TKqpUpsertRowsSettings::Parse(settings.Cast()); + } + if (!upsertSettings.IsUpdate) { + for (auto& [name, meta] : table.second->Metadata->Columns) { + if (meta.NotNull && !rowType->FindItem(name)) { + ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE, TStringBuilder() + << "Missing not null column in input: " << name << ". All not null columns should be initialized")); return TStatus::Error; } + + + if (meta.NotNull && rowType->FindItemType(name)->HasOptionalOrNull()) { + if (rowType->FindItemType(name)->GetKind() != ETypeAnnotationKind::Pg) { + ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder() + << "Can't set optional or NULL value to not null column: " << name + << ". All not null columns should be initialized")); + return TStatus::Error; + } + } } } @@ -666,7 +682,11 @@ TStatus AnnotateInsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData) { - if (!EnsureArgsCount(*node, 3, ctx)) { + if (!EnsureMinArgsCount(*node, 3, ctx)) { + return TStatus::Error; + } + + if (!EnsureMaxArgsCount(*node, 4, ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index d29889c1b76..02801dfe74b 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -249,6 +249,10 @@ TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) { bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TCoArgument& inputArg, TMaybeNode<TExprBase>& stageInput, TMaybeNode<TExprBase>& effect) { + TKqpUpsertRowsSettings settings; + if (node.Settings()) { + settings = TKqpUpsertRowsSettings::Parse(node.Settings().Cast()); + } if (IsDqPureExpr(node.Input())) { stageInput = BuildPrecomputeStage(node.Input(), ctx); @@ -258,7 +262,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .List(inputArg) .Build() .Columns(node.Columns()) - .Settings().Build() + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); return true; } @@ -281,7 +285,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .Build() .Done(); - TKqpUpsertRowsSettings settings; settings.SetInplace(); effect = Build<TKqpUpsertRows>(ctx, node.Pos()) @@ -303,7 +306,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .List(inputArg) .Build() .Columns(node.Columns()) - .Settings().Build() + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 2ea11132ab4..fd1541270be 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -215,6 +215,14 @@ TExprNode::TPtr GetPgNotNullColumns( return pgNotNullColumns.Done().Ptr(); } +TExprNode::TPtr IsUpdateSetting(TExprContext& ctx, const TPositionHandle& pos) { + return Build<TCoNameValueTupleList>(ctx, pos) + .Add() + .Name().Build("IsUpdate") + .Build() + .Done().Ptr(); +} + TExprBase BuildKqlSequencer(TExprBase& input, const TKikimrTableDescription& table, const TCoAtomList& outputCols, const TCoAtomList& autoIncrement, TPositionHandle pos, TExprContext& ctx) @@ -371,6 +379,7 @@ TExprBase BuildUpdateOnTable(const TKiWriteTable& write, const TCoAtomList& inpu .Done(); } + TExprBase BuildUpdateOnTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns, const TKikimrTableDescription& tableData, TExprContext& ctx) { @@ -381,6 +390,7 @@ TExprBase BuildUpdateOnTableWithIndex(const TKiWriteTable& write, const TCoAtomL .Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx)) .Build() .Columns(inputColumns) + .Settings(IsUpdateSetting(ctx, write.Pos())) .Done(); } @@ -564,6 +574,7 @@ TExprBase BuildUpdateTable(const TKiUpdateTable& update, const TKikimrTableDescr .Columns() .Add(updateColumnsList) .Build() + .Settings(IsUpdateSetting(ctx, update.Pos())) .Done(); } @@ -606,6 +617,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const .Columns<TCoAtomList>() .Add(updateColumnsList) .Build() + .Settings(IsUpdateSetting(ctx, update.Pos())) .Done(); effects.emplace_back(effect); @@ -623,6 +635,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const .Columns() .Add(updateColumnsList) .Build() + .Settings(IsUpdateSetting(ctx, update.Pos())) .Done(); effects.push_back(tableUpsert); @@ -688,6 +701,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const .Columns() .Add(indexColumnsList) .Build() + .Settings().Build() .Done(); effects.push_back(indexUpsert); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index e7ebdb014e4..2336afed97e 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -66,6 +66,7 @@ enum class TKqpPhyUpsertIndexMode { NYql::NNodes::TMaybeNode<NYql::NNodes::TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const NYql::NNodes::TExprBase& inputRows, const NYql::NNodes::TCoAtomList& inputColumns, - const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + const NYql::TKikimrTableDescription& table, const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, + NYql::TPositionHandle pos, NYql::TExprContext& ctx); } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp index e6c7c4b7593..811aad6e401 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp @@ -41,7 +41,7 @@ class TInsertUniqBuildHelper : public TUniqBuildHelper { public: TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx) - : TUniqBuildHelper(table, nullptr, pos, ctx, false) + : TUniqBuildHelper(table, nullptr, nullptr, pos, ctx, false) {} private: @@ -119,9 +119,9 @@ private: class TUpsertUniqBuildHelper : public TUniqBuildHelper { public: - TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TString>& usedIndexes, + TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx) - : TUniqBuildHelper(table, &usedIndexes, pos, ctx, true) + : TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, true) , PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx)) {} @@ -243,8 +243,8 @@ private: } TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, - const TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, - TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) + const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, + const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) { TVector<TUniqCheckNodes> checks; @@ -265,8 +265,15 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr TVector<TCoAtom> skipNullColumns; skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size()); for (const auto& column : table.Metadata->Indexes[i].KeyColumns) { - TCoAtom atom(ctx.NewAtom(pos, column)); - skipNullColumns.emplace_back(atom); + if (!inputColumns || inputColumns->contains(column)) { + TCoAtom atom(ctx.NewAtom(pos, column)); + skipNullColumns.emplace_back(atom); + } + } + + //no columns to skip -> no index columns to check -> skip check + if (skipNullColumns.empty()) { + continue; } auto skipNull = Build<TCoSkipNullMembers>(ctx, pos) @@ -285,11 +292,11 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr return checks; } -TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, +TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) : RowsListArg(ctx.NewArgument(pos, "rows_list")) , False(MakeBool(pos, false, ctx)) - , Checks(Prepare(RowsListArg, table, usedIndexes, pos, ctx, skipPkCheck)) + , Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck)) {} TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector, @@ -509,13 +516,14 @@ namespace NKikimr::NKqp::NOpt { TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx) { - return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx); + return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx); } TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, + const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx) { - return std::make_unique<TUpsertUniqBuildHelper>(table, usedIndexes, pos, ctx); + return std::make_unique<TUpsertUniqBuildHelper>(table, inputColumns, usedIndexes, pos, ctx); } } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h index 392c1dcb764..65c5d07678c 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h @@ -12,8 +12,8 @@ namespace NKikimr::NKqp::NOpt { class TUniqBuildHelper { public: using TPtr = std::unique_ptr<TUniqBuildHelper>; + size_t GetChecksNum() const; - NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult, NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; NYql::NNodes::TDqPhyPrecompute CreateInputPrecompute(const NYql::NNodes::TDqStage& computeKeysStage, @@ -28,7 +28,7 @@ public: protected: // table - metadata of table // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict - TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, + TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool skipPkCheck); size_t CalcComputeKeysStageOutputNum() const; @@ -63,7 +63,7 @@ private: static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector, const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx); static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, - const NYql::TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, + const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool skipPkCheck); virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage, @@ -82,5 +82,6 @@ TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescr NYql::TPositionHandle pos, NYql::TExprContext& ctx); TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, + const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx); } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp index 06d4714c144..493c5ace997 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp @@ -159,6 +159,11 @@ TExprBase KqpBuildUpdateStages(TExprBase node, TExprContext& ctx, const TKqpOpti .Table(update.Table()) .Input(prepareUpdate) .Columns(update.Columns()) + .Settings() + .Add() + .Name().Build("IsUpdate") + .Build() + .Build() .Done(); return node; diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp index 0d1e7440cf1..d8159dc386f 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp @@ -16,7 +16,7 @@ TExprBase KqpBuildUpdateIndexStages(TExprBase node, TExprContext& ctx, const TKq const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, update.Table().Path()); auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::UpdateOn, update.Input(), update.Columns(), - table, update.Pos(), ctx); + table, update.Settings(), update.Pos(), ctx); if (!effects) { return node; diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index d5b2d71cd7d..92a8e66708f 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -301,7 +301,7 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput .Done(); } -TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns, +TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns, bool checkOnlyGivenColumns, const TKikimrTableDescription& table, const TSecondaryIndexes& indexes, TPositionHandle pos, TExprContext& ctx) { auto condenseResult = CondenseInput(inputRows, ctx); @@ -324,7 +324,7 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c } } - auto helper = CreateUpsertUniqBuildHelper(table, usedIndexes, pos, ctx); + auto helper = CreateUpsertUniqBuildHelper(table, checkOnlyGivenColumns ? &inputColumns : nullptr, usedIndexes, pos, ctx); if (helper->GetChecksNum() == 0) { return condenseResult; } @@ -414,7 +414,8 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c } // namespace TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows, - const TCoAtomList& inputColumns, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) + const TCoAtomList& inputColumns, const TKikimrTableDescription& table, + const TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, TPositionHandle pos, TExprContext& ctx) { switch (mode) { case TKqpPhyUpsertIndexMode::Upsert: @@ -432,10 +433,21 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, inputColumnsSet.emplace(column.Value()); } + bool checkOnlyGivenColumns = false; + if (settings) { + for (const auto& setting : settings.Cast()) { + if (setting.Name().Value() == "IsUpdate") { + checkOnlyGivenColumns = true; + break; + } + } + } + auto filter = (mode == TKqpPhyUpsertIndexMode::UpdateOn) ? &inputColumnsSet : nullptr; const auto indexes = BuildSecondaryIndexVector(table, pos, ctx, filter); - auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, table, indexes, pos, ctx); + auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, checkOnlyGivenColumns, table, indexes, pos, ctx); + if (!checkedInput) { return {}; } @@ -463,6 +475,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, .Table(BuildTableMeta(table, pos, ctx)) .Input(tableUpsertRows) .Columns(inputColumns) + .Settings(settings) .Done(); TVector<TExprBase> effects; @@ -543,7 +556,7 @@ TExprBase KqpBuildUpsertIndexStages(TExprBase node, TExprContext& ctx, const TKq const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, upsert.Table().Path()); auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::Upsert, upsert.Input(), upsert.Columns(), - table, upsert.Pos(), ctx); + table, upsert.Settings(), upsert.Pos(), ctx); if (!effects) { return node; diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index 92fd414df23..c7142edf73b 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -356,6 +356,8 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext& [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { TKqpUpsertRows upsertRows(&node); + auto settings = TKqpUpsertRowsSettings::Parse(upsertRows); + const auto& tableMeta = ctx.GetTableMeta(upsertRows.Table()); auto rows = MkqlBuildExpr(upsertRows.Input().Ref(), buildCtx); @@ -381,7 +383,7 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext& TVector<TStringBuf> upsertColumns(upsertSet.begin(), upsertSet.end()); auto result = ctx.PgmBuilder().KqpUpsertRows(MakeTableId(upsertRows.Table()), rows, - GetKqpColumns(tableMeta, upsertColumns, false)); + GetKqpColumns(tableMeta, upsertColumns, false), settings.IsUpdate); return result; }); diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index e2437fc33eb..dd928eb55df 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -268,7 +268,7 @@ TRuntimeNode TKqpProgramBuilder::KqpLookupTable(const TTableId& tableId, const T } TRuntimeNode TKqpProgramBuilder::KqpUpsertRows(const TTableId& tableId, const TRuntimeNode& rows, - const TArrayRef<TKqpTableColumn>& upsertColumns) + const TArrayRef<TKqpTableColumn>& upsertColumns, bool isUpdate) { auto streamType = AS_TYPE(TStreamType, rows.GetStaticType()); auto rowType = AS_TYPE(TStructType, streamType->GetItemType()); @@ -279,7 +279,7 @@ TRuntimeNode TKqpProgramBuilder::KqpUpsertRows(const TTableId& tableId, const TR builder.Add(BuildTableIdLiteral(tableId, *this)); builder.Add(rows); builder.Add(BuildColumnIndicesMap(*this, *rowType, upsertColumns)); - + builder.Add(this->NewDataLiteral<bool>(isUpdate)); return TRuntimeNode(builder.Build(), false); } diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h index 3c4825001f0..f52cea905f1 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.h +++ b/ydb/core/kqp/runtime/kqp_program_builder.h @@ -62,7 +62,7 @@ public: const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns); TRuntimeNode KqpUpsertRows(const TTableId& tableId, const TRuntimeNode& rows, - const TArrayRef<TKqpTableColumn>& upsertColumns); + const TArrayRef<TKqpTableColumn>& upsertColumns, bool isUpdate); TRuntimeNode KqpDeleteRows(const TTableId& tableId, const TRuntimeNode& rows); diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp index e58d78fc33c..330ee560d1f 100644 --- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp @@ -9,6 +9,87 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; +namespace { + +void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type) { + const TString scheme = R"(Name: "MultiShardIndexed" + Columns { Name: "key" Type: "Uint64" NotNull: true } + Columns { Name: "fk" Type: "Uint32" NotNull: true } + Columns { Name: "fk2" Type: "Uint32" NotNull: true } + Columns { Name: "value" Type: "String" NotNull: true } + KeyColumnNames: ["key"] + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 100 } } } } + )"; + + NKikimrSchemeOp::TTableDescription desc; + bool parseOk = ::google::protobuf::TextFormat::ParseFromString(scheme, &desc); + UNIT_ASSERT(parseOk); + + auto status = client.TClient::CreateTableWithUniformShardedIndex("/Root", desc, "index", {"fk", "fk2"}, type); + UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK); +} + +void TestUpdateWithoutChangingNotNullColumn(TSession& session) { + { /* init table */ + const auto query = Q_(R"( + UPSERT INTO t (id, val, created_on) VALUES + (123, 'xxx', 1); + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { /* update not null column */ + const auto query = Q_("UPDATE t SET val = 'a' WHERE id = 123;"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM t; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u;123u;["a"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { /* update not null column */ + const auto query = Q_("UPDATE t SET val = 'a', created_on = NULL WHERE id = 123;"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + + { /* update on not null column */ + const auto query = Q_(R"( + $to_update = ( + SELECT id, 'b' AS val FROM t + WHERE id = 123 + ); + UPDATE t ON SELECT * FROM $to_update; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM t; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u;123u;["b"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { /* update on not null column */ + const auto query = Q_(R"( + $to_update = ( + SELECT id, 'b' AS val, NULL as created_on FROM t + WHERE id = 123 + ); + UPDATE t ON SELECT * FROM $to_update; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } +} + +} // namespace + Y_UNIT_TEST_SUITE(KqpNotNullColumns) { Y_UNIT_TEST(CreateTableWithDisabledNotNullDataColumns) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(false)); @@ -481,6 +562,51 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { } } + Y_UNIT_TEST(InsertFromSelect) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false) + .SetEnableNotNullDataColumns(true); + + TKikimrRunner kikimr(settings); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + + { + const auto q1 = Q_(R"( + CREATE TABLE `/Root/TestInsertNotNull` ( + Key Uint64, + Value String NOT NULL, + PRIMARY KEY (Key)) + )"); + + auto result = session.ExecuteSchemeQuery(q1).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto q2 = Q_(R"( + CREATE TABLE `/Root/TestInsert` ( + Key Uint64, + Value String, + PRIMARY KEY (Key)) + )"); + + result = session.ExecuteSchemeQuery(q2).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto query = Q_("INSERT INTO `/Root/TestInsert` (Key, Value) VALUES (1, NULL)"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { /* missing not null column */ + const auto query = Q_("INSERT INTO `/Root/TestInsertNotNull` (Key, Value) SELECT Key, Value FROM `/Root/TestInsert`"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(InsertNotNullPg) { auto settings = TKikimrSettings() .SetWithSampleTables(false) @@ -691,7 +817,6 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { } } - Y_UNIT_TEST(UpdateNotNull) { auto settings = TKikimrSettings() .SetWithSampleTables(false) @@ -744,6 +869,274 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString()); } } + + Y_UNIT_TEST(UpdateTable_DontChangeNotNull) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + + TKikimrRunner kikimr(settings); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + CREATE TABLE t + ( + id Uint64 NOT NULL, + val String, + created_on Uint64 NOT NULL, + PRIMARY KEY(id) + ); + )"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + TestUpdateWithoutChangingNotNullColumn(session); + } + + Y_UNIT_TEST(UpdateTable_DontChangeNotNullWithIndex) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + + TKikimrRunner kikimr(settings); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + CREATE TABLE t + ( + id Uint64 NOT NULL, + val String, + created_on Uint64 NOT NULL, + PRIMARY KEY(id), + INDEX Index GLOBAL SYNC ON (id, val) + ); + )"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + TestUpdateWithoutChangingNotNullColumn(session); + } + + Y_UNIT_TEST(UpdateTable_UniqIndex) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + UPSERT INTO `/Root/MultiShardIndexed` (key, fk, fk2, value) VALUES + (123, 1, 1, 'v1'); + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { /* update without not null column */ + const auto query = Q_("UPDATE `/Root/MultiShardIndexed` SET value = 'a' WHERE key = 123;"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM `/Root/MultiShardIndexed`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u;1u;123u;"a"]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { /* update on without not null column */ + const auto query = Q_(R"( + $to_update = ( + SELECT key, 'b' as value FROM `/Root/MultiShardIndexed` + WHERE key = 123 + ); + UPDATE `/Root/MultiShardIndexed` ON SELECT * FROM $to_update; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM `/Root/MultiShardIndexed`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u;1u;123u;"b"]])", FormatResultSetYson(result.GetResultSet(0))); + } + + { /* same fk */ + const auto query = Q_(R"( + UPSERT INTO `/Root/MultiShardIndexed` (key, fk, fk2, value) VALUES + (124, 1, 1, 'v2'); + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { /* update not null column */ + const auto query = Q_("UPDATE `/Root/MultiShardIndexed` SET value = 'a', fk = NULL WHERE key = 123;"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + + { /* update on select */ + const auto query = Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON SELECT * FROM `/Root/MultiShardIndexed` WHERE key = 123; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM `/Root/MultiShardIndexed`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[1u;1u;123u;"b"]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(UpdateTable_UniqIndexPg) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false)); + auto db = kikimr.GetQueryClient(); + auto tableClient = kikimr.GetTableClient(); + auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg); + { + auto result = db.ExecuteQuery(R"( + CREATE TABLE t( + id int4 primary key, + value int4, + label text NOT NULL, + label2 text NOT NULL, + side int4 NOT NULL, + constraint uniq1 unique (value, label), + constraint uniq2 unique (label2) + ); + )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + INSERT INTO t VALUES (1, 1, 'label1_1', 'label2_1', 1), (2, 2, 'label1_2', 'label2_2', 2); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE t SET value = 100 WHERE id = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = db.ExecuteQuery(R"( + SELECT * FROM t; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["1";"100";"label1_1";"label2_1";"1"];["2";"2";"label1_2";"label2_2";"2"]])", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE t SET label2 = 'label2_1' WHERE id = 1; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = db.ExecuteQuery(R"( + SELECT * FROM t; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["1";"100";"label1_1";"label2_1";"1"];["2";"2";"label1_2";"label2_2";"2"]])", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE t SET side = id + 1, label = 'new_label'; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = db.ExecuteQuery(R"( + SELECT * FROM t; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([["1";"100";"new_label";"label2_1";"2"];["2";"2";"new_label";"label2_2";"3"]])", FormatResultSetYson(result.GetResultSet(0))); + } + { + auto result = db.ExecuteQuery(R"( + UPDATE t SET value = 100, label = 'new_label' WHERE id = 2; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + result = db.ExecuteQuery(R"( + UPDATE t SET label2 = 'new_label'; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(UpdateTable_Immediate) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + + TKikimrRunner kikimr(settings); + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + { + const auto query = Q_(R"( + CREATE TABLE t + ( + id Uint64 NOT NULL, + val String, + created_on Uint64 NOT NULL, + PRIMARY KEY(id), + INDEX Index GLOBAL SYNC ON (id, val) + ); + )"); + + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const auto query = Q_(R"( + UPSERT INTO t (id, val, created_on) VALUES + (123, 'xxx', 1), + (124, 'yyy', 2); + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + UPDATE t SET val = 'yyy' WHERE id = 123; + DELETE FROM t WHERE val = 'yyy'; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM t; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + } + { + const auto query = Q_(R"( + UPSERT INTO t (id, val, created_on) VALUES + (123, 'xxx', 1), + (124, 'yyy', 2); + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + const auto query = Q_(R"( + UPDATE t SET created_on = 11; + DELETE FROM t WHERE val = 'xxx'; + UPDATE t SET val = 'abc' WHERE created_on = 11; + )"); + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + result = session.ExecuteDataQuery(R"( + SELECT * FROM t; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[11u;124u;["abc"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } Y_UNIT_TEST(UpdateNotNullPg) { auto settings = TKikimrSettings() diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp index 60f8b835933..d94235e9ef9 100644 --- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp @@ -169,12 +169,13 @@ private: IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpDatashardComputeContext& computeCtx) { - MKQL_ENSURE_S(callable.GetInputsCount() >= 3); + MKQL_ENSURE_S(callable.GetInputsCount() >= 4); auto tableNode = callable.GetInput(0); auto rowsNode = callable.GetInput(1); auto upsertColumnsNode = callable.GetInput(2); - + auto isUpdateNode = callable.GetInput(3); + bool isUpdate = AS_VALUE(TDataLiteral, isUpdateNode)->AsValue().Get<bool>(); auto tableId = NKqp::ParseTableId(tableNode); auto tableInfo = computeCtx.GetTable(tableId); MKQL_ENSURE(tableInfo, "Table not found: " << tableId.PathId.ToString()); @@ -238,14 +239,16 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF } for (const auto& [_, column] : tableInfo->Columns) { - if (column.NotNull) { + if (column.NotNull && !isUpdate) { auto it = inputIndex.find(column.Name); MKQL_ENSURE(it != inputIndex.end(), "Not null column " << column.Name << " has to be specified in upsert"); - auto columnType = rowType->GetMemberType(it->second); - MKQL_ENSURE(columnType->GetKind() != NMiniKQL::TType::EKind::Optional, - "Not null column " << column.Name << " can't be optional"); + if (it != inputIndex.end()) { + auto columnType = rowType->GetMemberType(it->second); + MKQL_ENSURE(columnType->GetKind() != NMiniKQL::TType::EKind::Optional, + "Not null column " << column.Name << " can't be optional"); + } } } |