aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-09-07 17:50:58 +0300
committerdcherednik <dcherednik@ydb.tech>2023-09-07 18:26:01 +0300
commitc15d4e831dbd20e89ff104be83c616d41836e25b (patch)
treec22d209a06a07c86fd2c14353363c2d9f722261d
parent1c1559c338a38442a4e4663780d79f829cb6a23a (diff)
downloadydb-c15d4e831dbd20e89ff104be83c616d41836e25b.tar.gz
Uniq constaint support for REPLACE, UPDATE ON KIKIMR-19064
-rw-r--r--ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp20
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h2
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp329
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp329
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h47
-rw-r--r--ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp103
-rw-r--r--ydb/core/kqp/opt/physical/effects/ya.make1
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp127
12 files changed, 616 insertions, 346 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt
index d2266274164..5b954686cb3 100644
--- a/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt
@@ -24,6 +24,7 @@ target_sources(opt-physical-effects PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt
index 761997487ef..2cafa69cd7e 100644
--- a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_sources(opt-physical-effects PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt
index 761997487ef..2cafa69cd7e 100644
--- a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt
@@ -25,6 +25,7 @@ target_sources(opt-physical-effects PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt
index d2266274164..5b954686cb3 100644
--- a/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt
@@ -24,6 +24,7 @@ target_sources(opt-physical-effects PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp
index ab36056a9ee..a8b3560f72e 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp
@@ -109,30 +109,26 @@ TMaybe<TCondenseInputResult> CondenseInput(const TExprBase& input, TExprContext&
};
}
-TMaybe<TCondenseInputResult> CondenseAndDeduplicateInput(const TExprBase& input, const TKikimrTableDescription& table,
+TCondenseInputResult DeduplicateInput(const TCondenseInputResult& condenseResult, const TKikimrTableDescription& table,
TExprContext& ctx)
{
- auto condenseResult = CondenseInput(input, ctx);
- if (!condenseResult) {
- return {};
- }
+ auto pos = condenseResult.Stream.Pos();
+ auto listArg = TCoArgument(ctx.NewArgument(pos, "list_arg"));
- auto listArg = TCoArgument(ctx.NewArgument(input.Pos(), "list_arg"));
-
- auto deduplicated = Build<TCoFlatMap>(ctx, input.Pos())
- .Input(condenseResult->Stream)
+ auto deduplicated = Build<TCoFlatMap>(ctx, pos)
+ .Input(condenseResult.Stream)
.Lambda()
.Args({listArg})
.Body<TCoJust>()
- .Input(RemoveDuplicateKeyFromInput(listArg, table, input.Pos(), ctx))
+ .Input(RemoveDuplicateKeyFromInput(listArg, table, pos, ctx))
.Build()
.Build()
.Done();
return TCondenseInputResult {
.Stream = deduplicated,
- .StageInputs = condenseResult->StageInputs,
- .StageArgs = condenseResult->StageArgs
+ .StageInputs = condenseResult.StageInputs,
+ .StageArgs = condenseResult.StageArgs
};
}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
index 3fc7bbf9d5c..e7ebdb014e4 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h
@@ -23,7 +23,7 @@ struct TCondenseInputResult {
TMaybe<TCondenseInputResult> CondenseInput(const NYql::NNodes::TExprBase& input, NYql::TExprContext& ctx);
-TMaybe<TCondenseInputResult> CondenseAndDeduplicateInput(const NYql::NNodes::TExprBase& input,
+TCondenseInputResult DeduplicateInput(const TCondenseInputResult& input,
const NYql::TKikimrTableDescription& table, NYql::TExprContext& ctx);
TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const NYql::NNodes::TExprBase& input,
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
index 975744fdff3..db291902bbc 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp
@@ -1,5 +1,6 @@
#include "kqp_opt_phy_effects_rules.h"
#include "kqp_opt_phy_effects_impl.h"
+#include "kqp_opt_phy_uniq_helper.h"
namespace NKikimr::NKqp::NOpt {
@@ -7,332 +8,6 @@ using namespace NYql;
using namespace NYql::NDq;
using namespace NYql::NNodes;
-namespace {
-
-struct TUniqCheckNodes {
- using TIndexId = int;
- static constexpr TIndexId INVALID_INDEX_ID = -1;
- TExprNode::TPtr DictKeys;
- TExprNode::TPtr UniqCmp;
- TIndexId IndexId = INVALID_INDEX_ID;
-};
-
-TUniqCheckNodes MakeUniqCheckNodes(const TCoLambda& selector,
- const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx)
-{
- TUniqCheckNodes result;
- auto dict = Build<TCoToDict>(ctx, pos)
- .List(rowsListArg)
- .KeySelector(selector)
- .PayloadSelector()
- .Args({"stub"})
- .Body<TCoVoid>()
- .Build()
- .Build()
- .Settings()
- .Add().Build("One")
- .Add().Build("Hashed")
- .Build()
- .Done().Ptr();
-
- result.DictKeys = Build<TCoDictKeys>(ctx, pos)
- .Dict(dict)
- .Done().Ptr();
-
- result.UniqCmp = Build<TCoCmpEqual>(ctx, pos)
- .Left<TCoLength>()
- .List(rowsListArg)
- .Build()
- .Right<TCoLength>()
- .List(dict)
- .Build()
- .Done().Ptr();
-
- return result;
-}
-
-TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t index, const NYql::TKikimrTableMetadata& meta,
- TExprNode::TPtr _false, TPositionHandle pos, TExprContext& ctx)
-{
- auto lookupKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
- .Connection<TDqCnValue>()
- .Output()
- .Stage(computeKeysStage)
- .Index().Build(IntToString<10>(index))
- .Build()
- .Build()
- .Done();
-
-
- auto stage = Build<TDqStage>(ctx, pos)
- .Inputs()
- .Add(lookupKeysPrecompute)
- .Build()
- .Program()
- .Args({"keys_list"})
- .Body<TCoMap>()
- .Input<TCoTake>()
- .Input<TKqpLookupTable>()
- .Table(BuildTableMeta(meta, pos, ctx))
- .LookupKeys<TCoIterator>()
- .List("keys_list")
- .Build()
- .Columns()
- .Build()
- .Build()
- .Count<TCoUint64>()
- .Literal().Build("1")
- .Build()
- .Build()
- .Lambda()
- .Args({"row"})
- .Body<TCoJust>()
- .Input(_false)
- .Build()
- .Build()
- .Build()
- .Build()
- .Settings().Build()
- .Done();
-
- return Build<TDqCnUnionAll>(ctx, pos)
- .Output()
- .Stage(stage)
- .Index().Build("0")
- .Build()
- .Done();
-}
-
-class TUniqBuildHelper {
- static TVector<TUniqCheckNodes> Prepare(const TCoArgument& rowsListArg, const TKikimrTableDescription& table,
- TPositionHandle pos, TExprContext& ctx)
- {
-
- TVector<TUniqCheckNodes> checks;
- checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx));
-
- // make uniq check for each uniq constraint
- for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) {
- if (table.Metadata->Indexes[i].State != TIndexDescription::EIndexState::Ready)
- continue;
- if (table.Metadata->Indexes[i].Type != TIndexDescription::EType::GlobalSyncUnique)
- continue;
-
- // Compatibility with PG semantic - allow multiple null in columns with unique constaint
- TVector<TCoAtom> skipNullColumns;
- skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());
- for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
- TCoAtom atom(ctx.NewAtom(pos, column));
- skipNullColumns.emplace_back(atom);
- }
-
- auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
- .Input(rowsListArg)
- .Members().Add(skipNullColumns).Build()
- .Done();
-
- checks.emplace_back(MakeUniqCheckNodes(MakeIndexPrefixKeySelector(table.Metadata->Indexes[i], pos, ctx), skipNull, pos, ctx));
- YQL_ENSURE(i < Max<TUniqCheckNodes::TIndexId>());
- checks.back().IndexId = i;
- }
- return checks;
- }
-
-public:
- TUniqBuildHelper(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx)
- : RowsListArg(ctx.NewArgument(pos, "rows_list"))
- , Checks(Prepare(RowsListArg, table, pos, ctx))
- {}
-
- size_t GetChecksNum() const {
- return Checks.size();
- }
-
- TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) const
- {
- // Number of items for output list 2 for each table + 1 for params itself
- const size_t nItems = Checks.size() * 2 + 1;
- TVector<TExprBase> types;
- types.reserve(nItems);
-
- types.emplace_back(
- Build<TCoTypeOf>(ctx, pos)
- .Value(RowsListArg)
- .Done()
- );
-
- for (size_t i = 0; i < Checks.size(); i++) {
- types.emplace_back(
- Build<TCoTypeOf>(ctx, pos)
- .Value(Checks[i].DictKeys)
- .Done()
- );
- types.emplace_back(
- Build<TCoTypeOf>(ctx, pos)
- .Value(Checks[i].UniqCmp)
- .Done()
- );
- }
-
- auto variantType = Build<TCoVariantType>(ctx, pos)
- .UnderlyingType<TCoTupleType>()
- .Add(types)
- .Build()
- .Done();
-
- TVector<TExprBase> variants;
- variants.reserve(nItems);
-
- variants.emplace_back(
- Build<TCoVariant>(ctx, pos)
- .Item(RowsListArg)
- .Index().Build("0")
- .VarType(variantType)
- .Done()
- );
-
- for (size_t i = 0, ch = 1; i < Checks.size(); i++) {
- variants.emplace_back(
- Build<TCoVariant>(ctx, pos)
- .Item(Checks[i].DictKeys)
- .Index().Build(IntToString<10>(ch++))
- .VarType(variantType)
- .Done()
- );
- variants.emplace_back(
- Build<TCoVariant>(ctx, pos)
- .Item(Checks[i].UniqCmp)
- .Index().Build(IntToString<10>(ch++))
- .VarType(variantType)
- .Done()
- );
- }
-
- return Build<TDqStage>(ctx, pos)
- .Inputs()
- .Add(condenseResult.StageInputs)
- .Build()
- .Program()
- .Args(condenseResult.StageArgs)
- .Body<TCoFlatMap>()
- .Input(condenseResult.Stream)
- .Lambda()
- .Args({RowsListArg})
- .Body<TCoAsList>()
- .Add(variants)
- .Build()
- .Build()
- .Build()
- .Build()
- .Settings().Build()
- .Done();
- }
-
- TDqPhyPrecompute CreateInputPrecompute(const TDqStage& computeKeysStage, TPositionHandle pos, TExprContext& ctx) {
- return Build<TDqPhyPrecompute>(ctx, pos)
- .Connection<TDqCnValue>()
- .Output()
- .Stage(computeKeysStage)
- .Index().Build("0")
- .Build()
- .Build()
- .Done();
- }
-
- TVector<TExprBase> CreateUniquePrecompute(const TDqStage& computeKeysStage, TPositionHandle pos, TExprContext& ctx) {
- TVector<TExprBase> uniquePrecomputes;
- uniquePrecomputes.reserve(Checks.size());
- for (size_t i = 0, output_index = 2; i < Checks.size(); i++, output_index += 2) {
- uniquePrecomputes.emplace_back(Build<TDqPhyPrecompute>(ctx, pos)
- .Connection<TDqCnValue>()
- .Output()
- .Stage(computeKeysStage)
- .Index().Build(IntToString<10>(output_index))
- .Build()
- .Build()
- .Done()
- );
- }
- return uniquePrecomputes;
- }
-
- struct TLookupNodes {
- TLookupNodes(size_t sz) {
- Stages.reserve(sz);
- Args.reserve(sz);
- }
- TVector<TExprBase> Stages;
- TVector<TCoArgument> Args;
- };
-
- TDqStage CreateLookupExistStage(const TDqStage& computeKeysStage, const TKikimrTableDescription& table,
- TExprNode::TPtr _true, TPositionHandle pos, TExprContext& ctx)
- {
- TLookupNodes lookupNodes(Checks.size());
-
- auto _false = MakeBool(pos, false, ctx);
-
- lookupNodes.Stages.emplace_back(
- // 1 is id of precompute key stage output for primary key
- CreateLookupStageWithConnection(computeKeysStage, 1, *table.Metadata, _false, pos, ctx)
- );
- lookupNodes.Args.emplace_back(
- Build<TCoArgument>(ctx, pos)
- .Name("arg0")
- .Done()
- );
-
- // For each index create lookup stage using computeKeysStage.
- // 3 is id of of precompute key stage output for index
- for (size_t i = 1, stage_out = 3; i < Checks.size(); i++, stage_out += 2) {
- const auto indexId = Checks[i].IndexId;
- YQL_ENSURE(indexId >=0);
- YQL_ENSURE((size_t)indexId < table.Metadata->SecondaryGlobalIndexMetadata.size());
- lookupNodes.Stages.emplace_back(
- CreateLookupStageWithConnection(computeKeysStage, stage_out,
- *(table.Metadata->SecondaryGlobalIndexMetadata[indexId]), _false, pos, ctx)
- );
- lookupNodes.Args.emplace_back(
- Build<TCoArgument>(ctx, pos)
- .Name(TString("arg") + IntToString<10>(i))
- .Done()
- );
- }
-
- return Build<TDqStage>(ctx, pos)
- .Inputs()
- .Add(lookupNodes.Stages)
- .Build()
- .Program()
- .Args(lookupNodes.Args)
- .Body<TCoCondense>()
- .Input<TCoToStream>()
- .Input<TCoExtend>()
- .Add(TVector<TExprBase>(lookupNodes.Args.begin(), lookupNodes.Args.end()))
- .Build()
- .Build()
- .State(_true)
- .SwitchHandler()
- .Args({"item", "state"})
- .Body(_false)
- .Build()
- .UpdateHandler()
- .Args({"item", "state"})
- .Body(_false)
- .Build()
- .Build()
- .Build()
- .Settings().Build()
- .Done();
- }
-
-private:
- TCoArgument RowsListArg;
- TVector<TUniqCheckNodes> Checks;
-};
-
-}
-
TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, const TKikimrTableDescription& table,
bool abortOnError, TPositionHandle pos, TExprContext& ctx)
{
@@ -341,7 +16,7 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons
return {};
}
- TUniqBuildHelper helper(table, pos, ctx);
+ TUniqBuildHelper helper(table, pos, ctx, false);
auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx);
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
new file mode 100644
index 00000000000..a115df7bb4d
--- /dev/null
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp
@@ -0,0 +1,329 @@
+#include "kqp_opt_phy_uniq_helper.h"
+
+#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
+#include <ydb/core/kqp/opt/kqp_opt_impl.h>
+
+using namespace NYql;
+using namespace NYql::NNodes;
+using namespace NKikimr::NKqp::NOpt;
+
+namespace {
+
+struct TLookupNodes {
+ TLookupNodes(size_t sz) {
+ Stages.reserve(sz);
+ Args.reserve(sz);
+ }
+ TVector<TExprBase> Stages;
+ TVector<TCoArgument> Args;
+};
+
+TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t index,
+ const NYql::TKikimrTableMetadata& meta, TExprNode::TPtr _false,
+ TPositionHandle pos, TExprContext& ctx)
+{
+ auto lookupKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(computeKeysStage)
+ .Index().Build(IntToString<10>(index))
+ .Build()
+ .Build()
+ .Done();
+
+
+ auto stage = Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(lookupKeysPrecompute)
+ .Build()
+ .Program()
+ .Args({"keys_list"})
+ .Body<TCoMap>()
+ .Input<TCoTake>()
+ .Input<TKqpLookupTable>()
+ .Table(BuildTableMeta(meta, pos, ctx))
+ .LookupKeys<TCoIterator>()
+ .List("keys_list")
+ .Build()
+ .Columns()
+ .Build()
+ .Build()
+ .Count<TCoUint64>()
+ .Literal().Build("1")
+ .Build()
+ .Build()
+ .Lambda()
+ .Args({"row"})
+ .Body<TCoJust>()
+ .Input(_false)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done();
+
+ return Build<TDqCnUnionAll>(ctx, pos)
+ .Output()
+ .Stage(stage)
+ .Index().Build("0")
+ .Build()
+ .Done();
+}
+
+}
+
+TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg,
+ const TKikimrTableDescription& table,
+ TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
+{
+ TVector<TUniqCheckNodes> checks;
+
+ if (!skipPkCheck) {
+ checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx));
+ }
+
+ // make uniq check for each uniq constraint
+ for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) {
+ if (table.Metadata->Indexes[i].State != TIndexDescription::EIndexState::Ready)
+ continue;
+ if (table.Metadata->Indexes[i].Type != TIndexDescription::EType::GlobalSyncUnique)
+ continue;
+
+ // Compatibility with PG semantic - allow multiple null in columns with unique constaint
+ TVector<TCoAtom> skipNullColumns;
+ skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size());
+ for (const auto& column : table.Metadata->Indexes[i].KeyColumns) {
+ TCoAtom atom(ctx.NewAtom(pos, column));
+ skipNullColumns.emplace_back(atom);
+ }
+
+ auto skipNull = Build<TCoSkipNullMembers>(ctx, pos)
+ .Input(rowsListArg)
+ .Members().Add(skipNullColumns).Build()
+ .Done();
+
+ checks.emplace_back(
+ MakeUniqCheckNodes(
+ MakeIndexPrefixKeySelector(table.Metadata->Indexes[i], pos, ctx), skipNull, pos, ctx));
+
+ YQL_ENSURE(i < Max<TUniqCheckNodes::TIndexId>());
+ checks.back().IndexId = i;
+ }
+
+ return checks;
+}
+
+TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table,
+ TPositionHandle pos, TExprContext& ctx, bool skipPkCheck)
+ : RowsListArg(ctx.NewArgument(pos, "rows_list"))
+ , Checks(Prepare(RowsListArg, table, pos, ctx, skipPkCheck))
+{}
+
+TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector,
+ const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx)
+{
+ TUniqCheckNodes result;
+ auto dict = Build<TCoToDict>(ctx, pos)
+ .List(rowsListArg)
+ .KeySelector(selector)
+ .PayloadSelector()
+ .Args({"stub"})
+ .Body<TCoVoid>()
+ .Build()
+ .Build()
+ .Settings()
+ .Add().Build("One")
+ .Add().Build("Hashed")
+ .Build()
+ .Done().Ptr();
+
+ result.DictKeys = Build<TCoDictKeys>(ctx, pos)
+ .Dict(dict)
+ .Done().Ptr();
+
+ result.UniqCmp = Build<TCoCmpEqual>(ctx, pos)
+ .Left<TCoLength>()
+ .List(rowsListArg)
+ .Build()
+ .Right<TCoLength>()
+ .List(dict)
+ .Build()
+ .Done().Ptr();
+
+ return result;
+}
+
+size_t TUniqBuildHelper::GetChecksNum() const {
+ return Checks.size();
+}
+
+TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& condenseResult,
+ TPositionHandle pos, TExprContext& ctx) const
+{
+ // Number of items for output list 2 for each table + 1 for params itself
+ const size_t nItems = Checks.size() * 2 + 1;
+ TVector<TExprBase> types;
+ types.reserve(nItems);
+
+ types.emplace_back(
+ Build<TCoTypeOf>(ctx, pos)
+ .Value(RowsListArg)
+ .Done()
+ );
+
+ for (size_t i = 0; i < Checks.size(); i++) {
+ types.emplace_back(
+ Build<TCoTypeOf>(ctx, pos)
+ .Value(Checks[i].DictKeys)
+ .Done()
+ );
+ types.emplace_back(
+ Build<TCoTypeOf>(ctx, pos)
+ .Value(Checks[i].UniqCmp)
+ .Done()
+ );
+ }
+
+ auto variantType = Build<TCoVariantType>(ctx, pos)
+ .UnderlyingType<TCoTupleType>()
+ .Add(types)
+ .Build()
+ .Done();
+
+ TVector<TExprBase> variants;
+ variants.reserve(nItems);
+
+ variants.emplace_back(
+ Build<TCoVariant>(ctx, pos)
+ .Item(RowsListArg)
+ .Index().Build("0")
+ .VarType(variantType)
+ .Done()
+ );
+
+ for (size_t i = 0, ch = 1; i < Checks.size(); i++) {
+ variants.emplace_back(
+ Build<TCoVariant>(ctx, pos)
+ .Item(Checks[i].DictKeys)
+ .Index().Build(IntToString<10>(ch++))
+ .VarType(variantType)
+ .Done()
+ );
+ variants.emplace_back(
+ Build<TCoVariant>(ctx, pos)
+ .Item(Checks[i].UniqCmp)
+ .Index().Build(IntToString<10>(ch++))
+ .VarType(variantType)
+ .Done()
+ );
+ }
+
+ return Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(condenseResult.StageInputs)
+ .Build()
+ .Program()
+ .Args(condenseResult.StageArgs)
+ .Body<TCoFlatMap>()
+ .Input(condenseResult.Stream)
+ .Lambda()
+ .Args({RowsListArg})
+ .Body<TCoAsList>()
+ .Add(variants)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done();
+}
+
+TDqPhyPrecompute TUniqBuildHelper::CreateInputPrecompute(const TDqStage& computeKeysStage,
+ TPositionHandle pos, TExprContext& ctx) const
+{
+ return Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(computeKeysStage)
+ .Index().Build("0")
+ .Build()
+ .Build()
+ .Done();
+}
+
+TVector<TExprBase> TUniqBuildHelper::CreateUniquePrecompute(const TDqStage& computeKeysStage,
+ TPositionHandle pos, TExprContext& ctx) const
+{
+ TVector<TExprBase> uniquePrecomputes;
+ uniquePrecomputes.reserve(Checks.size());
+ for (size_t i = 0, output_index = 2; i < Checks.size(); i++, output_index += 2) {
+ uniquePrecomputes.emplace_back(Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(computeKeysStage)
+ .Index().Build(IntToString<10>(output_index))
+ .Build()
+ .Build()
+ .Done()
+ );
+ }
+ return uniquePrecomputes;
+}
+
+TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysStage,
+ const TKikimrTableDescription& table, TExprNode::TPtr _true, TPositionHandle pos, TExprContext& ctx) const
+{
+ TLookupNodes lookupNodes(Checks.size());
+
+ auto _false = MakeBool(pos, false, ctx);
+
+ // 0 output - input stream itself so start with output 1
+ // Each check produces 2 outputs
+ for (size_t i = 0, stage_out = 1; i < Checks.size(); i++, stage_out += 2) {
+ const auto indexId = Checks[i].IndexId;
+ if (indexId == TUniqCheckNodes::NOT_INDEX_ID) {
+ lookupNodes.Stages.emplace_back(
+ CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, _false, pos, ctx)
+ );
+ } else {
+ YQL_ENSURE((size_t)indexId < table.Metadata->SecondaryGlobalIndexMetadata.size());
+ lookupNodes.Stages.emplace_back(
+ CreateLookupStageWithConnection(computeKeysStage, stage_out,
+ *(table.Metadata->SecondaryGlobalIndexMetadata[indexId]), _false, pos, ctx)
+ );
+ }
+
+ lookupNodes.Args.emplace_back(
+ Build<TCoArgument>(ctx, pos)
+ .Name(TString("arg") + IntToString<10>(i))
+ .Done()
+ );
+ }
+
+ return Build<TDqStage>(ctx, pos)
+ .Inputs()
+ .Add(lookupNodes.Stages)
+ .Build()
+ .Program()
+ .Args(lookupNodes.Args)
+ .Body<TCoCondense>()
+ .Input<TCoToStream>()
+ .Input<TCoExtend>()
+ .Add(TVector<TExprBase>(lookupNodes.Args.begin(), lookupNodes.Args.end()))
+ .Build()
+ .Build()
+ .State(_true)
+ .SwitchHandler()
+ .Args({"item", "state"})
+ .Body(_false)
+ .Build()
+ .UpdateHandler()
+ .Args({"item", "state"})
+ .Body(_false)
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Done();
+}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
new file mode 100644
index 00000000000..b580fdd8b5b
--- /dev/null
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include "kqp_opt_phy_effects_impl.h"
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+
+namespace NYql {
+ class TKikimrTableDescription;
+}
+
+namespace NKikimr::NKqp::NOpt {
+
+class TUniqBuildHelper {
+public:
+ TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos,
+ NYql::TExprContext& ctx, bool skipPkCheck);
+ size_t GetChecksNum() const;
+
+ NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
+ NYql::NNodes::TDqPhyPrecompute CreateInputPrecompute(const NYql::NNodes::TDqStage& computeKeysStage,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
+ TVector<NYql::NNodes::TExprBase> CreateUniquePrecompute(const NYql::NNodes::TDqStage& computeKeysStage,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
+ NYql::NNodes::TDqStage CreateLookupExistStage(const NYql::NNodes::TDqStage& computeKeysStage,
+ const NYql::TKikimrTableDescription& table, NYql::TExprNode::TPtr _true,
+ NYql::TPositionHandle pos, NYql::TExprContext& ctx) const;
+
+private:
+ struct TUniqCheckNodes {
+ using TIndexId = int;
+ static constexpr TIndexId NOT_INDEX_ID = -1;
+ NYql::TExprNode::TPtr DictKeys;
+ NYql::TExprNode::TPtr UniqCmp;
+ TIndexId IndexId = NOT_INDEX_ID;
+ };
+
+ static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector,
+ const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx);
+ static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg,
+ const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx,
+ bool skipPkCheck);
+
+ NYql::NNodes::TCoArgument RowsListArg;
+ TVector<TUniqCheckNodes> Checks;
+};
+
+}
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
index 58de519dfdf..e89cb670444 100644
--- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
+++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
@@ -1,5 +1,6 @@
#include "kqp_opt_phy_effects_rules.h"
#include "kqp_opt_phy_effects_impl.h"
+#include "kqp_opt_phy_uniq_helper.h"
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
@@ -300,6 +301,100 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Done();
}
+TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx)
+{
+ auto condenseResult = CondenseInput(inputRows, ctx);
+ if (!condenseResult) {
+ return {};
+ }
+
+ TUniqBuildHelper helper(table, pos, ctx, true);
+ if (helper.GetChecksNum() == 0) {
+ return condenseResult;
+ }
+
+ auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
+ auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx);
+ auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx);
+
+ auto _true = MakeBool(pos, true, ctx);
+
+ auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx);
+
+ // Returns <bool>: <true> - no existing keys, <false> - at least one key exists
+ auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
+ .Connection<TDqCnValue>()
+ .Output()
+ .Stage(aggrStage)
+ .Index().Build("0")
+ .Build()
+ .Build()
+ .Done();
+
+ TCoArgument inputRowList(ctx.NewArgument(pos, "rows_list"));
+ TCoArgument noExistingKeysArg(ctx.NewArgument(pos, "no_existing_keys"));
+
+ struct TUniqueCheckNodes {
+ TUniqueCheckNodes(size_t sz) {
+ Bodies.reserve(sz);
+ Args.reserve(sz);
+ }
+ TVector<TExprNode::TPtr> Bodies;
+ TVector<TCoArgument> Args;
+ } uniqueCheckNodes(helper.GetChecksNum());
+
+ for (size_t i = 0; i < helper.GetChecksNum(); i++) {
+ uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique"));
+ uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos)
+ .Value(_true)
+ .Predicate(uniqueCheckNodes.Args.back())
+ .IssueCode().Build(ToString((ui32) TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION))
+ .Message(MakeMessage("Duplicated keys found.", pos, ctx))
+ .Done().Ptr()
+ );
+ }
+
+ auto noExistingKeysCheck = Build<TKqpEnsure>(ctx, pos)
+ .Value(_true)
+ .Predicate(noExistingKeysArg)
+ .IssueCode().Build(ToString((ui32) TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION))
+ .Message(MakeMessage("Conflict with existing key.", pos, ctx))
+ .Done();
+
+ auto body = Build<TCoToStream>(ctx, pos)
+ .Input<TCoJust>()
+ .Input<TCoIfStrict>()
+ .Predicate<TCoAnd>()
+ .Add(uniqueCheckNodes.Bodies)
+ .Add(noExistingKeysCheck)
+ .Build()
+ .ThenValue(inputRowList)
+ .ElseValue<TCoList>()
+ .ListType(ExpandType(pos, *inputRows.Ref().GetTypeAnn(), ctx))
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+
+ TVector<NYql::NNodes::TCoArgument> stageArgs;
+ stageArgs.reserve(uniqueCheckNodes.Args.size() + 2);
+ stageArgs.emplace_back(inputRowList);
+ stageArgs.insert(stageArgs.end(), uniqueCheckNodes.Args.begin(), uniqueCheckNodes.Args.end());
+ stageArgs.emplace_back(noExistingKeysArg);
+
+ TVector<TExprBase> stageInputs;
+ stageInputs.reserve(uniquePrecomputes.size() + 2);
+ stageInputs.emplace_back(inputPrecompute);
+ stageInputs.insert(stageInputs.end(), uniquePrecomputes.begin(), uniquePrecomputes.end());
+ stageInputs.emplace_back(noExistingKeysPrecompute);
+
+ return TCondenseInputResult {
+ .Stream = body,
+ .StageInputs = std::move(stageInputs),
+ .StageArgs = std::move(stageArgs)
+ };
+}
+
} // namespace
TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows,
@@ -316,12 +411,14 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
const auto& pk = table.Metadata->KeyColumnNames;
- auto condenseInputResult = CondenseAndDeduplicateInput(inputRows, table, ctx);
- if (!condenseInputResult) {
+ auto checkedInput = CheckUniqueConstraint(inputRows, table, pos, ctx);
+ if (!checkedInput) {
return {};
}
- auto inputRowsAndKeys = PrecomputeRowsAndKeys(*condenseInputResult, table, pos, ctx);
+ auto condenseInputResult = DeduplicateInput(checkedInput.GetRef(), table, ctx);
+
+ auto inputRowsAndKeys = PrecomputeRowsAndKeys(condenseInputResult, table, pos, ctx);
THashSet<TStringBuf> inputColumnsSet;
for (const auto& column : inputColumns) {
diff --git a/ydb/core/kqp/opt/physical/effects/ya.make b/ydb/core/kqp/opt/physical/effects/ya.make
index 69eceb85dad..d95dff71cc0 100644
--- a/ydb/core/kqp/opt/physical/effects/ya.make
+++ b/ydb/core/kqp/opt/physical/effects/ya.make
@@ -6,6 +6,7 @@ SRCS(
kqp_opt_phy_indexes.cpp
kqp_opt_phy_insert_index.cpp
kqp_opt_phy_insert.cpp
+ kqp_opt_phy_uniq_helper.cpp
kqp_opt_phy_update_index.cpp
kqp_opt_phy_update.cpp
kqp_opt_phy_upsert_index.cpp
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
index fb97a0035f5..f7bb680a4d7 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
@@ -25,13 +25,14 @@ const NKikimrSchemeOp::EIndexType IG_UNIQUE = NKikimrSchemeOp::EIndexType::EInde
NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query) {
const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
return session.ExecuteDataQuery(query, txSettings,
- TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync();
+ TExecDataQuerySettings().KeepInQueryCache(true).CollectQueryStats(ECollectQueryStatsMode::Basic)).ExtractValueSync();
}
NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, TParams& params) {
const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
+
return session.ExecuteDataQuery(query, txSettings, params,
- TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync();
+ TExecDataQuerySettings().KeepInQueryCache(true).CollectQueryStats(ECollectQueryStatsMode::Basic)).ExtractValueSync();
}
void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type) {
@@ -186,6 +187,68 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) {
}
}
+ Y_UNIT_TEST(ReplaceFkAlreadyExist) {
+ TKikimrRunner kikimr(SyntaxV1Settings());
+ CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ FillTable(session);
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (2, 1000000000, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (2, 1000000000, "v1"),
+ (2, 1000000001, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(UpdateOnFkAlreadyExist) {
+ TKikimrRunner kikimr(SyntaxV1Settings());
+ CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ FillTable(session);
+
+ {
+ const TString query(Q_(R"(
+ UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES
+ (2, 1000000000, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query(Q_(R"(
+ UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES
+ (2, 1000000000, "v1"),
+ (2, 1000000001, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+ }
+
Y_UNIT_TEST(InsertFkPkOverlap) {
TKikimrRunner kikimr(SyntaxV1Settings());
CreateTableWithMultishardIndexComplexFkPk(kikimr.GetTestClient(), IG_UNIQUE);
@@ -204,7 +267,6 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) {
}
}
-
Y_UNIT_TEST(InsertNullInPk) {
TKikimrRunner kikimr(SyntaxV1Settings());
CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
@@ -420,6 +482,65 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) {
}
}
+
+ Y_UNIT_TEST(ReplaceFkDuplicate) {
+ TKikimrRunner kikimr(SyntaxV1Settings());
+ CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ FillTable(session);
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (1173915, 1230000001, "v1"),
+ (1173916, 1230000001, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (1173915, 1230000001, "v1"),
+ (1173916, 1230000002, "v1"),
+ (1173915, 1230000001, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (1173915, NULL, "v1"),
+ (1173916, NULL, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query(Q_(R"(
+ REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES
+ (1173917, NULL, "v1"),
+ (1173917, NULL, "v1");
+ )"));
+
+ auto result = ExecuteDataQuery(session, query);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable");
+ const TString expected = R"([[#;[1173915u]];[#;[1173916u]];[#;[1173917u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])";
+ UNIT_ASSERT_VALUES_EQUAL(yson, expected);
+ }
+ }
}
Y_UNIT_TEST_SUITE(KqpMultishardIndex) {