summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <[email protected]>2023-11-02 16:12:13 +0300
committerqrort <[email protected]>2023-11-02 18:03:09 +0300
commitcaedc26b8b82943b93d408a0a566e6d795967fd9 (patch)
tree9f08a5901f6158cfceb0b439e60295d6543ef89a
parent5d953bb776b195f7d4709e425ba9646b2ca866b2 (diff)
KIKIMR-19377: do not require not null columns in upsert for UPDATE TABLE
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp19
-rw-r--r--ydb/core/kqp/common/kqp_yql.h4
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json15
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp48
-rw-r--r--ydb/core/kqp/opt/kqp_opt_effects.cpp9
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp14
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h3
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp30
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h7
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp5
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp23
-rw-r--r--ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.h2
-rw-r--r--ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp395
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp15
17 files changed, 544 insertions, 55 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index b7cf36d3a84..fc3f73eb261 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -267,15 +267,18 @@ void TKqpReadTableSettings::AddSkipNullKey(const TString& key) {
SkipNullKeys.emplace_back(key);
}
-TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TKqpUpsertRows& node) {
+TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TCoNameValueTupleList& settingsList) {
TKqpUpsertRowsSettings settings;
- for (const auto& tuple : node.Settings()) {
+ for (const auto& tuple : settingsList) {
TStringBuf name = tuple.Name().Value();
-
+
if (name == TKqpUpsertRowsSettings::InplaceSettingName) {
YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
settings.Inplace = true;
+ } else if (name == TKqpUpsertRowsSettings::IsUpdateSettingName) {
+ YQL_ENSURE(tuple.Ref().ChildrenSize() == 1);
+ settings.IsUpdate = true;
} else {
YQL_ENSURE(false, "Unknown KqpUpsertRows setting name '" << name << "'");
}
@@ -284,6 +287,10 @@ TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TKqpUpsertRows& node)
return settings;
}
+TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const NNodes::TKqpUpsertRows& node) {
+ return TKqpUpsertRowsSettings::Parse(node.Settings());
+}
+
NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const {
TVector<TCoNameValueTuple> settings;
settings.reserve(1);
@@ -294,6 +301,12 @@ NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ct
.Name().Build(InplaceSettingName)
.Done());
}
+ if (IsUpdate) {
+ settings.emplace_back(
+ Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(IsUpdateSettingName)
+ .Done());
+ }
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 3aab7b5d085..9f02ae325f9 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -78,11 +78,15 @@ struct TKqpReadTableSettings {
struct TKqpUpsertRowsSettings {
static constexpr TStringBuf InplaceSettingName = "Inplace";
+ static constexpr TStringBuf IsUpdateSettingName = "IsUpdate";
bool Inplace = false;
+ bool IsUpdate = false;
void SetInplace() { Inplace = true; }
+ void SetIsUpdate() { IsUpdate = true; }
+ static TKqpUpsertRowsSettings Parse(const NNodes::TCoNameValueTupleList& settingsList);
static TKqpUpsertRowsSettings Parse(const NNodes::TKqpUpsertRows& node);
NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
};
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 09413f17157..3cb2588ccf5 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -245,12 +245,18 @@
{
"Name": "TKqlUpsertRows",
"Base": "TKqlUpsertRowsBase",
- "Match": {"Type": "Callable", "Name": "KqlUpsertRows"}
+ "Match": {"Type": "Callable", "Name": "KqlUpsertRows"},
+ "Children": [
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
},
{
"Name": "TKqlUpsertRowsIndex",
"Base": "TKqlUpsertRowsBase",
- "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"}
+ "Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"},
+ "Children": [
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
},
{
"Name": "TKqpUpsertRows",
@@ -296,7 +302,10 @@
{
"Name": "TKqlUpdateRowsIndex",
"Base": "TKqlUpdateRowsBase",
- "Match": {"Type": "Callable", "Name": "TKqlUpdateRowsIndex"}
+ "Match": {"Type": "Callable", "Name": "TKqlUpdateRowsIndex"},
+ "Children": [
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
},
{
"Name": "TKqlInsertRows",
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 57c6677cb07..b4bdd2612a7 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -554,22 +554,38 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
}
}
- for (auto& [name, meta] : table.second->Metadata->Columns) {
- if (meta.NotNull && !rowType->FindItem(name)) {
- ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE, TStringBuilder()
- << "Missing not null column in input: " << name
- << ". All not null columns should be initialized"));
- return TStatus::Error;
- }
-
-
- if (meta.NotNull && rowType->FindItemType(name)->HasOptionalOrNull()) {
- if (rowType->FindItemType(name)->GetKind() != ETypeAnnotationKind::Pg) {
- ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder()
- << "Can't set optional or NULL value to not null column: " << name
+ TMaybeNode<TCoNameValueTupleList> settings;
+ TKqpUpsertRowsSettings upsertSettings;
+ if (TKqlUpsertRows::Match(node.Get()) && node->ChildrenSize() > TKqlUpsertRows::idx_Settings) {
+ settings = node->ChildPtr(TKqlUpsertRows::idx_Settings);
+ }
+ if (TKqlUpsertRowsIndex::Match(node.Get()) && node->ChildrenSize() > TKqlUpsertRowsIndex::idx_Settings) {
+ settings = node->ChildPtr(TKqlUpsertRowsIndex::idx_Settings);
+ }
+ if (TKqpUpsertRows::Match(node.Get())) /* here settings are not optional*/ {
+ settings = node->ChildPtr(TKqpUpsertRows::idx_Settings);
+ }
+ if (settings) {
+ upsertSettings = TKqpUpsertRowsSettings::Parse(settings.Cast());
+ }
+ if (!upsertSettings.IsUpdate) {
+ for (auto& [name, meta] : table.second->Metadata->Columns) {
+ if (meta.NotNull && !rowType->FindItem(name)) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_NO_COLUMN_DEFAULT_VALUE, TStringBuilder()
+ << "Missing not null column in input: " << name
<< ". All not null columns should be initialized"));
return TStatus::Error;
}
+
+
+ if (meta.NotNull && rowType->FindItemType(name)->HasOptionalOrNull()) {
+ if (rowType->FindItemType(name)->GetKind() != ETypeAnnotationKind::Pg) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(node->Pos()), TIssuesIds::KIKIMR_BAD_COLUMN_TYPE, TStringBuilder()
+ << "Can't set optional or NULL value to not null column: " << name
+ << ". All not null columns should be initialized"));
+ return TStatus::Error;
+ }
+ }
}
}
@@ -666,7 +682,11 @@ TStatus AnnotateInsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData)
{
- if (!EnsureArgsCount(*node, 3, ctx)) {
+ if (!EnsureMinArgsCount(*node, 3, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureMaxArgsCount(*node, 4, ctx)) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp
index d29889c1b76..02801dfe74b 100644
--- a/ydb/core/kqp/opt/kqp_opt_effects.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp
@@ -249,6 +249,10 @@ TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) {
bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
const TCoArgument& inputArg, TMaybeNode<TExprBase>& stageInput, TMaybeNode<TExprBase>& effect)
{
+ TKqpUpsertRowsSettings settings;
+ if (node.Settings()) {
+ settings = TKqpUpsertRowsSettings::Parse(node.Settings().Cast());
+ }
if (IsDqPureExpr(node.Input())) {
stageInput = BuildPrecomputeStage(node.Input(), ctx);
@@ -258,7 +262,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.List(inputArg)
.Build()
.Columns(node.Columns())
- .Settings().Build()
+ .Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
return true;
}
@@ -281,7 +285,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.Build()
.Done();
- TKqpUpsertRowsSettings settings;
settings.SetInplace();
effect = Build<TKqpUpsertRows>(ctx, node.Pos())
@@ -303,7 +306,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
.List(inputArg)
.Build()
.Columns(node.Columns())
- .Settings().Build()
+ .Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index 2ea11132ab4..fd1541270be 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -215,6 +215,14 @@ TExprNode::TPtr GetPgNotNullColumns(
return pgNotNullColumns.Done().Ptr();
}
+TExprNode::TPtr IsUpdateSetting(TExprContext& ctx, const TPositionHandle& pos) {
+ return Build<TCoNameValueTupleList>(ctx, pos)
+ .Add()
+ .Name().Build("IsUpdate")
+ .Build()
+ .Done().Ptr();
+}
+
TExprBase BuildKqlSequencer(TExprBase& input, const TKikimrTableDescription& table,
const TCoAtomList& outputCols, const TCoAtomList& autoIncrement,
TPositionHandle pos, TExprContext& ctx)
@@ -371,6 +379,7 @@ TExprBase BuildUpdateOnTable(const TKiWriteTable& write, const TCoAtomList& inpu
.Done();
}
+
TExprBase BuildUpdateOnTableWithIndex(const TKiWriteTable& write, const TCoAtomList& inputColumns,
const TKikimrTableDescription& tableData, TExprContext& ctx)
{
@@ -381,6 +390,7 @@ TExprBase BuildUpdateOnTableWithIndex(const TKiWriteTable& write, const TCoAtomL
.Columns(GetPgNotNullColumns(tableData, write.Pos(), ctx))
.Build()
.Columns(inputColumns)
+ .Settings(IsUpdateSetting(ctx, write.Pos()))
.Done();
}
@@ -564,6 +574,7 @@ TExprBase BuildUpdateTable(const TKiUpdateTable& update, const TKikimrTableDescr
.Columns()
.Add(updateColumnsList)
.Build()
+ .Settings(IsUpdateSetting(ctx, update.Pos()))
.Done();
}
@@ -606,6 +617,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const
.Columns<TCoAtomList>()
.Add(updateColumnsList)
.Build()
+ .Settings(IsUpdateSetting(ctx, update.Pos()))
.Done();
effects.emplace_back(effect);
@@ -623,6 +635,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const
.Columns()
.Add(updateColumnsList)
.Build()
+ .Settings(IsUpdateSetting(ctx, update.Pos()))
.Done();
effects.push_back(tableUpsert);
@@ -688,6 +701,7 @@ TVector<TExprBase> BuildUpdateTableWithIndex(const TKiUpdateTable& update, const
.Columns()
.Add(indexColumnsList)
.Build()
+ .Settings().Build()
.Done();
effects.push_back(indexUpsert);
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
index e7ebdb014e4..2336afed97e 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
@@ -66,6 +66,7 @@ enum class TKqpPhyUpsertIndexMode {
NYql::NNodes::TMaybeNode<NYql::NNodes::TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
const NYql::NNodes::TExprBase& inputRows, const NYql::NNodes::TCoAtomList& inputColumns,
- const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
+ const NYql::TKikimrTableDescription& table, const NYql::NNodes::TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx);
} // NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
index e6c7c4b7593..811aad6e401 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
@@ -41,7 +41,7 @@ class TInsertUniqBuildHelper : public TUniqBuildHelper {
public:
TInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
NYql::TExprContext& ctx)
- : TUniqBuildHelper(table, nullptr, pos, ctx, false)
+ : TUniqBuildHelper(table, nullptr, nullptr, pos, ctx, false)
{}
private:
@@ -119,9 +119,9 @@ private:
class TUpsertUniqBuildHelper : public TUniqBuildHelper {
public:
- TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TString>& usedIndexes,
+ TUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>& usedIndexes,
NYql::TPositionHandle pos, NYql::TExprContext& ctx)
- : TUniqBuildHelper(table, &usedIndexes, pos, ctx, true)
+ : TUniqBuildHelper(table, inputColumns, &usedIndexes, pos, ctx, true)
, PkDict(MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), RowsListArg, pos, ctx))
{}
@@ -243,8 +243,8 @@ private:
}
TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
- const TKikimrTableDescription& table, const THashSet<TString>* usedIndexes,
- TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
+ const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns,
+ const THashSet<TString>* usedIndexes, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
{
TVector<TUniqCheckNodes> checks;
@@ -265,8 +265,15 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
TVector<TCoAtom> skipNullColumns;
skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());
for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
- TCoAtom atom(ctx.NewAtom(pos, column));
- skipNullColumns.emplace_back(atom);
+ if (!inputColumns || inputColumns->contains(column)) {
+ TCoAtom atom(ctx.NewAtom(pos, column));
+ skipNullColumns.emplace_back(atom);
+ }
+ }
+
+ //no columns to skip -> no index columns to check -> skip check
+ if (skipNullColumns.empty()) {
+ continue;
}
auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
@@ -285,11 +292,11 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr
return checks;
}
-TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TString>* usedIndexes,
+TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes,
TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
: RowsListArg(ctx.NewArgument(pos, "rows_list"))
, False(MakeBool(pos, false, ctx))
- , Checks(Prepare(RowsListArg, table, usedIndexes, pos, ctx, skipPkCheck))
+ , Checks(Prepare(RowsListArg, table, inputColumns, usedIndexes, pos, ctx, skipPkCheck))
{}
TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector,
@@ -509,13 +516,14 @@ namespace NKikimr::NKqp::NOpt {
TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
NYql::TExprContext& ctx)
{
- return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx);
+ return std::make_unique<TInsertUniqBuildHelper>(table, pos, ctx);
}
TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
+ const THashSet<TStringBuf>* inputColumns,
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx)
{
- return std::make_unique<TUpsertUniqBuildHelper>(table, usedIndexes, pos, ctx);
+ return std::make_unique<TUpsertUniqBuildHelper>(table, inputColumns, usedIndexes, pos, ctx);
}
}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
index 392c1dcb764..65c5d07678c 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
@@ -12,8 +12,8 @@ namespace NKikimr::NKqp::NOpt {
class TUniqBuildHelper {
public:
using TPtr = std::unique_ptr<TUniqBuildHelper>;
+
size_t GetChecksNum() const;
-
NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult,
NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
NYql::NNodes::TDqPhyPrecompute CreateInputPrecompute(const NYql::NNodes::TDqStage& computeKeysStage,
@@ -28,7 +28,7 @@ public:
protected:
// table - metadata of table
// skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict
- TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
+ TUniqBuildHelper(const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, bool skipPkCheck);
size_t CalcComputeKeysStageOutputNum() const;
@@ -63,7 +63,7 @@ private:
static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector,
const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
- const NYql::TKikimrTableDescription& table, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
+ const NYql::TKikimrTableDescription& table, const THashSet<TStringBuf>* inputColumns, const THashSet<TString>* usedIndexes, NYql::TPositionHandle pos,
NYql::TExprContext& ctx, bool skipPkCheck);
virtual NYql::NNodes::TDqCnUnionAll CreateLookupStageWithConnection(const NYql::NNodes::TDqStage& computeKeysStage,
@@ -82,5 +82,6 @@ TUniqBuildHelper::TPtr CreateInsertUniqBuildHelper(const NYql::TKikimrTableDescr
NYql::TPositionHandle pos, NYql::TExprContext& ctx);
TUniqBuildHelper::TPtr CreateUpsertUniqBuildHelper(const NYql::TKikimrTableDescription& table,
+ const THashSet<TStringBuf>* inputColumns,
const THashSet<TString>& usedIndexes, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
index 06d4714c144..493c5ace997 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
@@ -159,6 +159,11 @@ TExprBase KqpBuildUpdateStages(TExprBase node, TExprContext& ctx, const TKqpOpti
.Table(update.Table())
.Input(prepareUpdate)
.Columns(update.Columns())
+ .Settings()
+ .Add()
+ .Name().Build("IsUpdate")
+ .Build()
+ .Build()
.Done();
return node;
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
index 0d1e7440cf1..d8159dc386f 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
@@ -16,7 +16,7 @@ TExprBase KqpBuildUpdateIndexStages(TExprBase node, TExprContext& ctx, const TKq
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, update.Table().Path());
auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::UpdateOn, update.Input(), update.Columns(),
- table, update.Pos(), ctx);
+ table, update.Settings(), update.Pos(), ctx);
if (!effects) {
return node;
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
index d5b2d71cd7d..92a8e66708f 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
@@ -301,7 +301,7 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Done();
}
-TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns,
+TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns, bool checkOnlyGivenColumns,
const TKikimrTableDescription& table, const TSecondaryIndexes& indexes, TPositionHandle pos, TExprContext& ctx)
{
auto condenseResult = CondenseInput(inputRows, ctx);
@@ -324,7 +324,7 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
}
}
- auto helper = CreateUpsertUniqBuildHelper(table, usedIndexes, pos, ctx);
+ auto helper = CreateUpsertUniqBuildHelper(table, checkOnlyGivenColumns ? &inputColumns : nullptr, usedIndexes, pos, ctx);
if (helper->GetChecksNum() == 0) {
return condenseResult;
}
@@ -414,7 +414,8 @@ TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, c
} // namespace
TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows,
- const TCoAtomList& inputColumns, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx)
+ const TCoAtomList& inputColumns, const TKikimrTableDescription& table,
+ const TMaybeNode<NYql::NNodes::TCoNameValueTupleList>& settings, TPositionHandle pos, TExprContext& ctx)
{
switch (mode) {
case TKqpPhyUpsertIndexMode::Upsert:
@@ -432,10 +433,21 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
inputColumnsSet.emplace(column.Value());
}
+ bool checkOnlyGivenColumns = false;
+ if (settings) {
+ for (const auto& setting : settings.Cast()) {
+ if (setting.Name().Value() == "IsUpdate") {
+ checkOnlyGivenColumns = true;
+ break;
+ }
+ }
+ }
+
auto filter = (mode == TKqpPhyUpsertIndexMode::UpdateOn) ? &inputColumnsSet : nullptr;
const auto indexes = BuildSecondaryIndexVector(table, pos, ctx, filter);
- auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, table, indexes, pos, ctx);
+ auto checkedInput = CheckUniqueConstraint(inputRows, inputColumnsSet, checkOnlyGivenColumns, table, indexes, pos, ctx);
+
if (!checkedInput) {
return {};
}
@@ -463,6 +475,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
.Table(BuildTableMeta(table, pos, ctx))
.Input(tableUpsertRows)
.Columns(inputColumns)
+ .Settings(settings)
.Done();
TVector<TExprBase> effects;
@@ -543,7 +556,7 @@ TExprBase KqpBuildUpsertIndexStages(TExprBase node, TExprContext& ctx, const TKq
const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, upsert.Table().Path());
auto effects = KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode::Upsert, upsert.Input(), upsert.Columns(),
- table, upsert.Pos(), ctx);
+ table, upsert.Settings(), upsert.Pos(), ctx);
if (!effects) {
return node;
diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
index 92fd414df23..c7142edf73b 100644
--- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
@@ -356,6 +356,8 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
[&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
TKqpUpsertRows upsertRows(&node);
+ auto settings = TKqpUpsertRowsSettings::Parse(upsertRows);
+
const auto& tableMeta = ctx.GetTableMeta(upsertRows.Table());
auto rows = MkqlBuildExpr(upsertRows.Input().Ref(), buildCtx);
@@ -381,7 +383,7 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
TVector<TStringBuf> upsertColumns(upsertSet.begin(), upsertSet.end());
auto result = ctx.PgmBuilder().KqpUpsertRows(MakeTableId(upsertRows.Table()), rows,
- GetKqpColumns(tableMeta, upsertColumns, false));
+ GetKqpColumns(tableMeta, upsertColumns, false), settings.IsUpdate);
return result;
});
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index e2437fc33eb..dd928eb55df 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -268,7 +268,7 @@ TRuntimeNode TKqpProgramBuilder::KqpLookupTable(const TTableId& tableId, const T
}
TRuntimeNode TKqpProgramBuilder::KqpUpsertRows(const TTableId& tableId, const TRuntimeNode& rows,
- const TArrayRef<TKqpTableColumn>& upsertColumns)
+ const TArrayRef<TKqpTableColumn>& upsertColumns, bool isUpdate)
{
auto streamType = AS_TYPE(TStreamType, rows.GetStaticType());
auto rowType = AS_TYPE(TStructType, streamType->GetItemType());
@@ -279,7 +279,7 @@ TRuntimeNode TKqpProgramBuilder::KqpUpsertRows(const TTableId& tableId, const TR
builder.Add(BuildTableIdLiteral(tableId, *this));
builder.Add(rows);
builder.Add(BuildColumnIndicesMap(*this, *rowType, upsertColumns));
-
+ builder.Add(this->NewDataLiteral<bool>(isUpdate));
return TRuntimeNode(builder.Build(), false);
}
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h
index 3c4825001f0..f52cea905f1 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.h
+++ b/ydb/core/kqp/runtime/kqp_program_builder.h
@@ -62,7 +62,7 @@ public:
const TArrayRef<TKqpTableColumn>& keyColumns, const TArrayRef<TKqpTableColumn>& columns);
TRuntimeNode KqpUpsertRows(const TTableId& tableId, const TRuntimeNode& rows,
- const TArrayRef<TKqpTableColumn>& upsertColumns);
+ const TArrayRef<TKqpTableColumn>& upsertColumns, bool isUpdate);
TRuntimeNode KqpDeleteRows(const TTableId& tableId, const TRuntimeNode& rows);
diff --git a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
index e58d78fc33c..330ee560d1f 100644
--- a/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_not_null_ut.cpp
@@ -9,6 +9,87 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;
+namespace {
+
+void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type) {
+ const TString scheme = R"(Name: "MultiShardIndexed"
+ Columns { Name: "key" Type: "Uint64" NotNull: true }
+ Columns { Name: "fk" Type: "Uint32" NotNull: true }
+ Columns { Name: "fk2" Type: "Uint32" NotNull: true }
+ Columns { Name: "value" Type: "String" NotNull: true }
+ KeyColumnNames: ["key"]
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 100 } } } }
+ )";
+
+ NKikimrSchemeOp::TTableDescription desc;
+ bool parseOk = ::google::protobuf::TextFormat::ParseFromString(scheme, &desc);
+ UNIT_ASSERT(parseOk);
+
+ auto status = client.TClient::CreateTableWithUniformShardedIndex("/Root", desc, "index", {"fk", "fk2"}, type);
+ UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK);
+}
+
+void TestUpdateWithoutChangingNotNullColumn(TSession& session) {
+ { /* init table */
+ const auto query = Q_(R"(
+ UPSERT INTO t (id, val, created_on) VALUES
+ (123, 'xxx', 1);
+ )");
+
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { /* update not null column */
+ const auto query = Q_("UPDATE t SET val = 'a' WHERE id = 123;");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM t;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[1u;123u;["a"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { /* update not null column */
+ const auto query = Q_("UPDATE t SET val = 'a', created_on = NULL WHERE id = 123;");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+
+ { /* update on not null column */
+ const auto query = Q_(R"(
+ $to_update = (
+ SELECT id, 'b' AS val FROM t
+ WHERE id = 123
+ );
+ UPDATE t ON SELECT * FROM $to_update;
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM t;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[1u;123u;["b"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { /* update on not null column */
+ const auto query = Q_(R"(
+ $to_update = (
+ SELECT id, 'b' AS val, NULL as created_on FROM t
+ WHERE id = 123
+ );
+ UPDATE t ON SELECT * FROM $to_update;
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+}
+
+} // namespace
+
Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
Y_UNIT_TEST(CreateTableWithDisabledNotNullDataColumns) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false).SetEnableNotNullDataColumns(false));
@@ -481,6 +562,51 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
}
}
+ Y_UNIT_TEST(InsertFromSelect) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false)
+ .SetEnableNotNullDataColumns(true);
+
+ TKikimrRunner kikimr(settings);
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+
+ {
+ const auto q1 = Q_(R"(
+ CREATE TABLE `/Root/TestInsertNotNull` (
+ Key Uint64,
+ Value String NOT NULL,
+ PRIMARY KEY (Key))
+ )");
+
+ auto result = session.ExecuteSchemeQuery(q1).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto q2 = Q_(R"(
+ CREATE TABLE `/Root/TestInsert` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key))
+ )");
+
+ result = session.ExecuteSchemeQuery(q2).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_("INSERT INTO `/Root/TestInsert` (Key, Value) VALUES (1, NULL)");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { /* missing not null column */
+ const auto query = Q_("INSERT INTO `/Root/TestInsertNotNull` (Key, Value) SELECT Key, Value FROM `/Root/TestInsert`");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT(!result.IsSuccess());
+ UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
+ }
+ }
+
Y_UNIT_TEST(InsertNotNullPg) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
@@ -691,7 +817,6 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
}
}
-
Y_UNIT_TEST(UpdateNotNull) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false)
@@ -744,6 +869,274 @@ Y_UNIT_TEST_SUITE(KqpNotNullColumns) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_BAD_COLUMN_TYPE), result.GetIssues().ToString());
}
}
+
+ Y_UNIT_TEST(UpdateTable_DontChangeNotNull) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ {
+ const auto query = Q_(R"(
+ CREATE TABLE t
+ (
+ id Uint64 NOT NULL,
+ val String,
+ created_on Uint64 NOT NULL,
+ PRIMARY KEY(id)
+ );
+ )");
+
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ TestUpdateWithoutChangingNotNullColumn(session);
+ }
+
+ Y_UNIT_TEST(UpdateTable_DontChangeNotNullWithIndex) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ {
+ const auto query = Q_(R"(
+ CREATE TABLE t
+ (
+ id Uint64 NOT NULL,
+ val String,
+ created_on Uint64 NOT NULL,
+ PRIMARY KEY(id),
+ INDEX Index GLOBAL SYNC ON (id, val)
+ );
+ )");
+
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ TestUpdateWithoutChangingNotNullColumn(session);
+ }
+
+ Y_UNIT_TEST(UpdateTable_UniqIndex) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ CreateTableWithMultishardIndex(kikimr.GetTestClient(), NKikimrSchemeOp::EIndexType::EIndexTypeGlobalUnique);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ {
+ const auto query = Q_(R"(
+ UPSERT INTO `/Root/MultiShardIndexed` (key, fk, fk2, value) VALUES
+ (123, 1, 1, 'v1');
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { /* update without not null column */
+ const auto query = Q_("UPDATE `/Root/MultiShardIndexed` SET value = 'a' WHERE key = 123;");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM `/Root/MultiShardIndexed`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[1u;1u;123u;"a"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { /* update on without not null column */
+ const auto query = Q_(R"(
+ $to_update = (
+ SELECT key, 'b' as value FROM `/Root/MultiShardIndexed`
+ WHERE key = 123
+ );
+ UPDATE `/Root/MultiShardIndexed` ON SELECT * FROM $to_update;
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM `/Root/MultiShardIndexed`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[1u;1u;123u;"b"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { /* same fk */
+ const auto query = Q_(R"(
+ UPSERT INTO `/Root/MultiShardIndexed` (key, fk, fk2, value) VALUES
+ (124, 1, 1, 'v2');
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ { /* update not null column */
+ const auto query = Q_("UPDATE `/Root/MultiShardIndexed` SET value = 'a', fk = NULL WHERE key = 123;");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+
+ { /* update on select */
+ const auto query = Q_(R"(
+ UPDATE `/Root/MultiShardIndexed` ON SELECT * FROM `/Root/MultiShardIndexed` WHERE key = 123;
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM `/Root/MultiShardIndexed`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[1u;1u;123u;"b"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(UpdateTable_UniqIndexPg) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
+ auto setting = NKikimrKqp::TKqpSetting();
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetKqpSettings({setting});
+ TKikimrRunner kikimr(serverSettings.SetWithSampleTables(false));
+ auto db = kikimr.GetQueryClient();
+ auto tableClient = kikimr.GetTableClient();
+ auto settings = NYdb::NQuery::TExecuteQuerySettings().Syntax(NYdb::NQuery::ESyntax::Pg);
+ {
+ auto result = db.ExecuteQuery(R"(
+ CREATE TABLE t(
+ id int4 primary key,
+ value int4,
+ label text NOT NULL,
+ label2 text NOT NULL,
+ side int4 NOT NULL,
+ constraint uniq1 unique (value, label),
+ constraint uniq2 unique (label2)
+ );
+ )", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ INSERT INTO t VALUES (1, 1, 'label1_1', 'label2_1', 1), (2, 2, 'label1_2', 'label2_2', 2);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE t SET value = 100 WHERE id = 1;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM t;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["1";"100";"label1_1";"label2_1";"1"];["2";"2";"label1_2";"label2_2";"2"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE t SET label2 = 'label2_1' WHERE id = 1;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM t;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["1";"100";"label1_1";"label2_1";"1"];["2";"2";"label1_2";"label2_2";"2"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE t SET side = id + 1, label = 'new_label';
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = db.ExecuteQuery(R"(
+ SELECT * FROM t;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([["1";"100";"new_label";"label2_1";"2"];["2";"2";"new_label";"label2_2";"3"]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ auto result = db.ExecuteQuery(R"(
+ UPDATE t SET value = 100, label = 'new_label' WHERE id = 2;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ result = db.ExecuteQuery(R"(
+ UPDATE t SET label2 = 'new_label';
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(UpdateTable_Immediate) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
+ auto client = kikimr.GetTableClient();
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ {
+ const auto query = Q_(R"(
+ CREATE TABLE t
+ (
+ id Uint64 NOT NULL,
+ val String,
+ created_on Uint64 NOT NULL,
+ PRIMARY KEY(id),
+ INDEX Index GLOBAL SYNC ON (id, val)
+ );
+ )");
+
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const auto query = Q_(R"(
+ UPSERT INTO t (id, val, created_on) VALUES
+ (123, 'xxx', 1),
+ (124, 'yyy', 2);
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto query = Q_(R"(
+ UPDATE t SET val = 'yyy' WHERE id = 123;
+ DELETE FROM t WHERE val = 'yyy';
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM t;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ const auto query = Q_(R"(
+ UPSERT INTO t (id, val, created_on) VALUES
+ (123, 'xxx', 1),
+ (124, 'yyy', 2);
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ {
+ const auto query = Q_(R"(
+ UPDATE t SET created_on = 11;
+ DELETE FROM t WHERE val = 'xxx';
+ UPDATE t SET val = 'abc' WHERE created_on = 11;
+ )");
+ auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ result = session.ExecuteDataQuery(R"(
+ SELECT * FROM t;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[11u;124u;["abc"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
Y_UNIT_TEST(UpdateNotNullPg) {
auto settings = TKikimrSettings()
diff --git a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
index 60f8b835933..d94235e9ef9 100644
--- a/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_upsert_rows.cpp
@@ -169,12 +169,13 @@ private:
IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpDatashardComputeContext& computeCtx)
{
- MKQL_ENSURE_S(callable.GetInputsCount() >= 3);
+ MKQL_ENSURE_S(callable.GetInputsCount() >= 4);
auto tableNode = callable.GetInput(0);
auto rowsNode = callable.GetInput(1);
auto upsertColumnsNode = callable.GetInput(2);
-
+ auto isUpdateNode = callable.GetInput(3);
+ bool isUpdate = AS_VALUE(TDataLiteral, isUpdateNode)->AsValue().Get<bool>();
auto tableId = NKqp::ParseTableId(tableNode);
auto tableInfo = computeCtx.GetTable(tableId);
MKQL_ENSURE(tableInfo, "Table not found: " << tableId.PathId.ToString());
@@ -238,14 +239,16 @@ IComputationNode* WrapKqpUpsertRows(TCallable& callable, const TComputationNodeF
}
for (const auto& [_, column] : tableInfo->Columns) {
- if (column.NotNull) {
+ if (column.NotNull && !isUpdate) {
auto it = inputIndex.find(column.Name);
MKQL_ENSURE(it != inputIndex.end(),
"Not null column " << column.Name << " has to be specified in upsert");
- auto columnType = rowType->GetMemberType(it->second);
- MKQL_ENSURE(columnType->GetKind() != NMiniKQL::TType::EKind::Optional,
- "Not null column " << column.Name << " can't be optional");
+ if (it != inputIndex.end()) {
+ auto columnType = rowType->GetMemberType(it->second);
+ MKQL_ENSURE(columnType->GetKind() != NMiniKQL::TType::EKind::Optional,
+ "Not null column " << column.Name << " can't be optional");
+ }
}
}