diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-09-07 17:50:58 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-09-07 18:26:01 +0300 |
commit | c15d4e831dbd20e89ff104be83c616d41836e25b (patch) | |
tree | c22d209a06a07c86fd2c14353363c2d9f722261d | |
parent | 1c1559c338a38442a4e4663780d79f829cb6a23a (diff) | |
download | ydb-c15d4e831dbd20e89ff104be83c616d41836e25b.tar.gz |
Uniq constaint support for REPLACE, UPDATE ON KIKIMR-19064
12 files changed, 616 insertions, 346 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt index d2266274164..5b954686cb3 100644 --- a/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.darwin-x86_64.txt @@ -24,6 +24,7 @@ target_sources(opt-physical-effects PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt index 761997487ef..2cafa69cd7e 100644 --- a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_sources(opt-physical-effects PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt index 761997487ef..2cafa69cd7e 100644 --- a/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.linux-x86_64.txt @@ -25,6 +25,7 @@ target_sources(opt-physical-effects PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp diff --git a/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt index d2266274164..5b954686cb3 100644 --- a/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/opt/physical/effects/CMakeLists.windows-x86_64.txt @@ -24,6 +24,7 @@ target_sources(opt-physical-effects PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_update.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp index ab36056a9ee..a8b3560f72e 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp @@ -109,30 +109,26 @@ TMaybe<TCondenseInputResult> CondenseInput(const TExprBase& input, TExprContext& }; } -TMaybe<TCondenseInputResult> CondenseAndDeduplicateInput(const TExprBase& input, const TKikimrTableDescription& table, +TCondenseInputResult DeduplicateInput(const TCondenseInputResult& condenseResult, const TKikimrTableDescription& table, TExprContext& ctx) { - auto condenseResult = CondenseInput(input, ctx); - if (!condenseResult) { - return {}; - } + auto pos = condenseResult.Stream.Pos(); + auto listArg = TCoArgument(ctx.NewArgument(pos, "list_arg")); - auto listArg = TCoArgument(ctx.NewArgument(input.Pos(), "list_arg")); - - auto deduplicated = Build<TCoFlatMap>(ctx, input.Pos()) - .Input(condenseResult->Stream) + auto deduplicated = Build<TCoFlatMap>(ctx, pos) + .Input(condenseResult.Stream) .Lambda() .Args({listArg}) .Body<TCoJust>() - .Input(RemoveDuplicateKeyFromInput(listArg, table, input.Pos(), ctx)) + .Input(RemoveDuplicateKeyFromInput(listArg, table, pos, ctx)) .Build() .Build() .Done(); return TCondenseInputResult { .Stream = deduplicated, - .StageInputs = condenseResult->StageInputs, - .StageArgs = condenseResult->StageArgs + .StageInputs = condenseResult.StageInputs, + .StageArgs = condenseResult.StageArgs }; } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index 3fc7bbf9d5c..e7ebdb014e4 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -23,7 +23,7 @@ struct TCondenseInputResult { TMaybe<TCondenseInputResult> CondenseInput(const NYql::NNodes::TExprBase& input, NYql::TExprContext& ctx); -TMaybe<TCondenseInputResult> CondenseAndDeduplicateInput(const NYql::NNodes::TExprBase& input, +TCondenseInputResult DeduplicateInput(const TCondenseInputResult& input, const NYql::TKikimrTableDescription& table, NYql::TExprContext& ctx); TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const NYql::NNodes::TExprBase& input, diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp index 975744fdff3..db291902bbc 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert.cpp @@ -1,5 +1,6 @@ #include "kqp_opt_phy_effects_rules.h" #include "kqp_opt_phy_effects_impl.h" +#include "kqp_opt_phy_uniq_helper.h" namespace NKikimr::NKqp::NOpt { @@ -7,332 +8,6 @@ using namespace NYql; using namespace NYql::NDq; using namespace NYql::NNodes; -namespace { - -struct TUniqCheckNodes { - using TIndexId = int; - static constexpr TIndexId INVALID_INDEX_ID = -1; - TExprNode::TPtr DictKeys; - TExprNode::TPtr UniqCmp; - TIndexId IndexId = INVALID_INDEX_ID; -}; - -TUniqCheckNodes MakeUniqCheckNodes(const TCoLambda& selector, - const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) -{ - TUniqCheckNodes result; - auto dict = Build<TCoToDict>(ctx, pos) - .List(rowsListArg) - .KeySelector(selector) - .PayloadSelector() - .Args({"stub"}) - .Body<TCoVoid>() - .Build() - .Build() - .Settings() - .Add().Build("One") - .Add().Build("Hashed") - .Build() - .Done().Ptr(); - - result.DictKeys = Build<TCoDictKeys>(ctx, pos) - .Dict(dict) - .Done().Ptr(); - - result.UniqCmp = Build<TCoCmpEqual>(ctx, pos) - .Left<TCoLength>() - .List(rowsListArg) - .Build() - .Right<TCoLength>() - .List(dict) - .Build() - .Done().Ptr(); - - return result; -} - -TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t index, const NYql::TKikimrTableMetadata& meta, - TExprNode::TPtr _false, TPositionHandle pos, TExprContext& ctx) -{ - auto lookupKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) - .Connection<TDqCnValue>() - .Output() - .Stage(computeKeysStage) - .Index().Build(IntToString<10>(index)) - .Build() - .Build() - .Done(); - - - auto stage = Build<TDqStage>(ctx, pos) - .Inputs() - .Add(lookupKeysPrecompute) - .Build() - .Program() - .Args({"keys_list"}) - .Body<TCoMap>() - .Input<TCoTake>() - .Input<TKqpLookupTable>() - .Table(BuildTableMeta(meta, pos, ctx)) - .LookupKeys<TCoIterator>() - .List("keys_list") - .Build() - .Columns() - .Build() - .Build() - .Count<TCoUint64>() - .Literal().Build("1") - .Build() - .Build() - .Lambda() - .Args({"row"}) - .Body<TCoJust>() - .Input(_false) - .Build() - .Build() - .Build() - .Build() - .Settings().Build() - .Done(); - - return Build<TDqCnUnionAll>(ctx, pos) - .Output() - .Stage(stage) - .Index().Build("0") - .Build() - .Done(); -} - -class TUniqBuildHelper { - static TVector<TUniqCheckNodes> Prepare(const TCoArgument& rowsListArg, const TKikimrTableDescription& table, - TPositionHandle pos, TExprContext& ctx) - { - - TVector<TUniqCheckNodes> checks; - checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx)); - - // make uniq check for each uniq constraint - for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) { - if (table.Metadata->Indexes[i].State != TIndexDescription::EIndexState::Ready) - continue; - if (table.Metadata->Indexes[i].Type != TIndexDescription::EType::GlobalSyncUnique) - continue; - - // Compatibility with PG semantic - allow multiple null in columns with unique constaint - TVector<TCoAtom> skipNullColumns; - skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size()); - for (const auto& column : table.Metadata->Indexes[i].KeyColumns) { - TCoAtom atom(ctx.NewAtom(pos, column)); - skipNullColumns.emplace_back(atom); - } - - auto skipNull = Build<TCoSkipNullMembers>(ctx, pos) - .Input(rowsListArg) - .Members().Add(skipNullColumns).Build() - .Done(); - - checks.emplace_back(MakeUniqCheckNodes(MakeIndexPrefixKeySelector(table.Metadata->Indexes[i], pos, ctx), skipNull, pos, ctx)); - YQL_ENSURE(i < Max<TUniqCheckNodes::TIndexId>()); - checks.back().IndexId = i; - } - return checks; - } - -public: - TUniqBuildHelper(const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) - : RowsListArg(ctx.NewArgument(pos, "rows_list")) - , Checks(Prepare(RowsListArg, table, pos, ctx)) - {} - - size_t GetChecksNum() const { - return Checks.size(); - } - - TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) const - { - // Number of items for output list 2 for each table + 1 for params itself - const size_t nItems = Checks.size() * 2 + 1; - TVector<TExprBase> types; - types.reserve(nItems); - - types.emplace_back( - Build<TCoTypeOf>(ctx, pos) - .Value(RowsListArg) - .Done() - ); - - for (size_t i = 0; i < Checks.size(); i++) { - types.emplace_back( - Build<TCoTypeOf>(ctx, pos) - .Value(Checks[i].DictKeys) - .Done() - ); - types.emplace_back( - Build<TCoTypeOf>(ctx, pos) - .Value(Checks[i].UniqCmp) - .Done() - ); - } - - auto variantType = Build<TCoVariantType>(ctx, pos) - .UnderlyingType<TCoTupleType>() - .Add(types) - .Build() - .Done(); - - TVector<TExprBase> variants; - variants.reserve(nItems); - - variants.emplace_back( - Build<TCoVariant>(ctx, pos) - .Item(RowsListArg) - .Index().Build("0") - .VarType(variantType) - .Done() - ); - - for (size_t i = 0, ch = 1; i < Checks.size(); i++) { - variants.emplace_back( - Build<TCoVariant>(ctx, pos) - .Item(Checks[i].DictKeys) - .Index().Build(IntToString<10>(ch++)) - .VarType(variantType) - .Done() - ); - variants.emplace_back( - Build<TCoVariant>(ctx, pos) - .Item(Checks[i].UniqCmp) - .Index().Build(IntToString<10>(ch++)) - .VarType(variantType) - .Done() - ); - } - - return Build<TDqStage>(ctx, pos) - .Inputs() - .Add(condenseResult.StageInputs) - .Build() - .Program() - .Args(condenseResult.StageArgs) - .Body<TCoFlatMap>() - .Input(condenseResult.Stream) - .Lambda() - .Args({RowsListArg}) - .Body<TCoAsList>() - .Add(variants) - .Build() - .Build() - .Build() - .Build() - .Settings().Build() - .Done(); - } - - TDqPhyPrecompute CreateInputPrecompute(const TDqStage& computeKeysStage, TPositionHandle pos, TExprContext& ctx) { - return Build<TDqPhyPrecompute>(ctx, pos) - .Connection<TDqCnValue>() - .Output() - .Stage(computeKeysStage) - .Index().Build("0") - .Build() - .Build() - .Done(); - } - - TVector<TExprBase> CreateUniquePrecompute(const TDqStage& computeKeysStage, TPositionHandle pos, TExprContext& ctx) { - TVector<TExprBase> uniquePrecomputes; - uniquePrecomputes.reserve(Checks.size()); - for (size_t i = 0, output_index = 2; i < Checks.size(); i++, output_index += 2) { - uniquePrecomputes.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) - .Connection<TDqCnValue>() - .Output() - .Stage(computeKeysStage) - .Index().Build(IntToString<10>(output_index)) - .Build() - .Build() - .Done() - ); - } - return uniquePrecomputes; - } - - struct TLookupNodes { - TLookupNodes(size_t sz) { - Stages.reserve(sz); - Args.reserve(sz); - } - TVector<TExprBase> Stages; - TVector<TCoArgument> Args; - }; - - TDqStage CreateLookupExistStage(const TDqStage& computeKeysStage, const TKikimrTableDescription& table, - TExprNode::TPtr _true, TPositionHandle pos, TExprContext& ctx) - { - TLookupNodes lookupNodes(Checks.size()); - - auto _false = MakeBool(pos, false, ctx); - - lookupNodes.Stages.emplace_back( - // 1 is id of precompute key stage output for primary key - CreateLookupStageWithConnection(computeKeysStage, 1, *table.Metadata, _false, pos, ctx) - ); - lookupNodes.Args.emplace_back( - Build<TCoArgument>(ctx, pos) - .Name("arg0") - .Done() - ); - - // For each index create lookup stage using computeKeysStage. - // 3 is id of of precompute key stage output for index - for (size_t i = 1, stage_out = 3; i < Checks.size(); i++, stage_out += 2) { - const auto indexId = Checks[i].IndexId; - YQL_ENSURE(indexId >=0); - YQL_ENSURE((size_t)indexId < table.Metadata->SecondaryGlobalIndexMetadata.size()); - lookupNodes.Stages.emplace_back( - CreateLookupStageWithConnection(computeKeysStage, stage_out, - *(table.Metadata->SecondaryGlobalIndexMetadata[indexId]), _false, pos, ctx) - ); - lookupNodes.Args.emplace_back( - Build<TCoArgument>(ctx, pos) - .Name(TString("arg") + IntToString<10>(i)) - .Done() - ); - } - - return Build<TDqStage>(ctx, pos) - .Inputs() - .Add(lookupNodes.Stages) - .Build() - .Program() - .Args(lookupNodes.Args) - .Body<TCoCondense>() - .Input<TCoToStream>() - .Input<TCoExtend>() - .Add(TVector<TExprBase>(lookupNodes.Args.begin(), lookupNodes.Args.end())) - .Build() - .Build() - .State(_true) - .SwitchHandler() - .Args({"item", "state"}) - .Body(_false) - .Build() - .UpdateHandler() - .Args({"item", "state"}) - .Body(_false) - .Build() - .Build() - .Build() - .Settings().Build() - .Done(); - } - -private: - TCoArgument RowsListArg; - TVector<TUniqCheckNodes> Checks; -}; - -} - TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, const TKikimrTableDescription& table, bool abortOnError, TPositionHandle pos, TExprContext& ctx) { @@ -341,7 +16,7 @@ TMaybeNode<TDqCnUnionAll> MakeConditionalInsertRows(const TExprBase& input, cons return {}; } - TUniqBuildHelper helper(table, pos, ctx); + TUniqBuildHelper helper(table, pos, ctx, false); auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp new file mode 100644 index 00000000000..a115df7bb4d --- /dev/null +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp @@ -0,0 +1,329 @@ +#include "kqp_opt_phy_uniq_helper.h" + +#include <ydb/core/kqp/provider/yql_kikimr_provider.h> +#include <ydb/core/kqp/opt/kqp_opt_impl.h> + +using namespace NYql; +using namespace NYql::NNodes; +using namespace NKikimr::NKqp::NOpt; + +namespace { + +struct TLookupNodes { + TLookupNodes(size_t sz) { + Stages.reserve(sz); + Args.reserve(sz); + } + TVector<TExprBase> Stages; + TVector<TCoArgument> Args; +}; + +TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t index, + const NYql::TKikimrTableMetadata& meta, TExprNode::TPtr _false, + TPositionHandle pos, TExprContext& ctx) +{ + auto lookupKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build(IntToString<10>(index)) + .Build() + .Build() + .Done(); + + + auto stage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(lookupKeysPrecompute) + .Build() + .Program() + .Args({"keys_list"}) + .Body<TCoMap>() + .Input<TCoTake>() + .Input<TKqpLookupTable>() + .Table(BuildTableMeta(meta, pos, ctx)) + .LookupKeys<TCoIterator>() + .List("keys_list") + .Build() + .Columns() + .Build() + .Build() + .Count<TCoUint64>() + .Literal().Build("1") + .Build() + .Build() + .Lambda() + .Args({"row"}) + .Body<TCoJust>() + .Input(_false) + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + return Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(stage) + .Index().Build("0") + .Build() + .Done(); +} + +} + +TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, + const TKikimrTableDescription& table, + TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) +{ + TVector<TUniqCheckNodes> checks; + + if (!skipPkCheck) { + checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx)); + } + + // make uniq check for each uniq constraint + for (size_t i = 0; i < table.Metadata->Indexes.size(); i++) { + if (table.Metadata->Indexes[i].State != TIndexDescription::EIndexState::Ready) + continue; + if (table.Metadata->Indexes[i].Type != TIndexDescription::EType::GlobalSyncUnique) + continue; + + // Compatibility with PG semantic - allow multiple null in columns with unique constaint + TVector<TCoAtom> skipNullColumns; + skipNullColumns.reserve(table.Metadata->Indexes[i].KeyColumns.size()); + for (const auto& column : table.Metadata->Indexes[i].KeyColumns) { + TCoAtom atom(ctx.NewAtom(pos, column)); + skipNullColumns.emplace_back(atom); + } + + auto skipNull = Build<TCoSkipNullMembers>(ctx, pos) + .Input(rowsListArg) + .Members().Add(skipNullColumns).Build() + .Done(); + + checks.emplace_back( + MakeUniqCheckNodes( + MakeIndexPrefixKeySelector(table.Metadata->Indexes[i], pos, ctx), skipNull, pos, ctx)); + + YQL_ENSURE(i < Max<TUniqCheckNodes::TIndexId>()); + checks.back().IndexId = i; + } + + return checks; +} + +TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, + TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) + : RowsListArg(ctx.NewArgument(pos, "rows_list")) + , Checks(Prepare(RowsListArg, table, pos, ctx, skipPkCheck)) +{} + +TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCoLambda& selector, + const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) +{ + TUniqCheckNodes result; + auto dict = Build<TCoToDict>(ctx, pos) + .List(rowsListArg) + .KeySelector(selector) + .PayloadSelector() + .Args({"stub"}) + .Body<TCoVoid>() + .Build() + .Build() + .Settings() + .Add().Build("One") + .Add().Build("Hashed") + .Build() + .Done().Ptr(); + + result.DictKeys = Build<TCoDictKeys>(ctx, pos) + .Dict(dict) + .Done().Ptr(); + + result.UniqCmp = Build<TCoCmpEqual>(ctx, pos) + .Left<TCoLength>() + .List(rowsListArg) + .Build() + .Right<TCoLength>() + .List(dict) + .Build() + .Done().Ptr(); + + return result; +} + +size_t TUniqBuildHelper::GetChecksNum() const { + return Checks.size(); +} + +TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& condenseResult, + TPositionHandle pos, TExprContext& ctx) const +{ + // Number of items for output list 2 for each table + 1 for params itself + const size_t nItems = Checks.size() * 2 + 1; + TVector<TExprBase> types; + types.reserve(nItems); + + types.emplace_back( + Build<TCoTypeOf>(ctx, pos) + .Value(RowsListArg) + .Done() + ); + + for (size_t i = 0; i < Checks.size(); i++) { + types.emplace_back( + Build<TCoTypeOf>(ctx, pos) + .Value(Checks[i].DictKeys) + .Done() + ); + types.emplace_back( + Build<TCoTypeOf>(ctx, pos) + .Value(Checks[i].UniqCmp) + .Done() + ); + } + + auto variantType = Build<TCoVariantType>(ctx, pos) + .UnderlyingType<TCoTupleType>() + .Add(types) + .Build() + .Done(); + + TVector<TExprBase> variants; + variants.reserve(nItems); + + variants.emplace_back( + Build<TCoVariant>(ctx, pos) + .Item(RowsListArg) + .Index().Build("0") + .VarType(variantType) + .Done() + ); + + for (size_t i = 0, ch = 1; i < Checks.size(); i++) { + variants.emplace_back( + Build<TCoVariant>(ctx, pos) + .Item(Checks[i].DictKeys) + .Index().Build(IntToString<10>(ch++)) + .VarType(variantType) + .Done() + ); + variants.emplace_back( + Build<TCoVariant>(ctx, pos) + .Item(Checks[i].UniqCmp) + .Index().Build(IntToString<10>(ch++)) + .VarType(variantType) + .Done() + ); + } + + return Build<TDqStage>(ctx, pos) + .Inputs() + .Add(condenseResult.StageInputs) + .Build() + .Program() + .Args(condenseResult.StageArgs) + .Body<TCoFlatMap>() + .Input(condenseResult.Stream) + .Lambda() + .Args({RowsListArg}) + .Body<TCoAsList>() + .Add(variants) + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); +} + +TDqPhyPrecompute TUniqBuildHelper::CreateInputPrecompute(const TDqStage& computeKeysStage, + TPositionHandle pos, TExprContext& ctx) const +{ + return Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build("0") + .Build() + .Build() + .Done(); +} + +TVector<TExprBase> TUniqBuildHelper::CreateUniquePrecompute(const TDqStage& computeKeysStage, + TPositionHandle pos, TExprContext& ctx) const +{ + TVector<TExprBase> uniquePrecomputes; + uniquePrecomputes.reserve(Checks.size()); + for (size_t i = 0, output_index = 2; i < Checks.size(); i++, output_index += 2) { + uniquePrecomputes.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build(IntToString<10>(output_index)) + .Build() + .Build() + .Done() + ); + } + return uniquePrecomputes; +} + +TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysStage, + const TKikimrTableDescription& table, TExprNode::TPtr _true, TPositionHandle pos, TExprContext& ctx) const +{ + TLookupNodes lookupNodes(Checks.size()); + + auto _false = MakeBool(pos, false, ctx); + + // 0 output - input stream itself so start with output 1 + // Each check produces 2 outputs + for (size_t i = 0, stage_out = 1; i < Checks.size(); i++, stage_out += 2) { + const auto indexId = Checks[i].IndexId; + if (indexId == TUniqCheckNodes::NOT_INDEX_ID) { + lookupNodes.Stages.emplace_back( + CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, _false, pos, ctx) + ); + } else { + YQL_ENSURE((size_t)indexId < table.Metadata->SecondaryGlobalIndexMetadata.size()); + lookupNodes.Stages.emplace_back( + CreateLookupStageWithConnection(computeKeysStage, stage_out, + *(table.Metadata->SecondaryGlobalIndexMetadata[indexId]), _false, pos, ctx) + ); + } + + lookupNodes.Args.emplace_back( + Build<TCoArgument>(ctx, pos) + .Name(TString("arg") + IntToString<10>(i)) + .Done() + ); + } + + return Build<TDqStage>(ctx, pos) + .Inputs() + .Add(lookupNodes.Stages) + .Build() + .Program() + .Args(lookupNodes.Args) + .Body<TCoCondense>() + .Input<TCoToStream>() + .Input<TCoExtend>() + .Add(TVector<TExprBase>(lookupNodes.Args.begin(), lookupNodes.Args.end())) + .Build() + .Build() + .State(_true) + .SwitchHandler() + .Args({"item", "state"}) + .Body(_false) + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body(_false) + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); +} diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h new file mode 100644 index 00000000000..b580fdd8b5b --- /dev/null +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h @@ -0,0 +1,47 @@ +#pragma once + +#include "kqp_opt_phy_effects_impl.h" +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> + +namespace NYql { + class TKikimrTableDescription; +} + +namespace NKikimr::NKqp::NOpt { + +class TUniqBuildHelper { +public: + TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, + NYql::TExprContext& ctx, bool skipPkCheck); + size_t GetChecksNum() const; + + NYql::NNodes::TDqStage CreateComputeKeysStage(const TCondenseInputResult& condenseResult, + NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; + NYql::NNodes::TDqPhyPrecompute CreateInputPrecompute(const NYql::NNodes::TDqStage& computeKeysStage, + NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; + TVector<NYql::NNodes::TExprBase> CreateUniquePrecompute(const NYql::NNodes::TDqStage& computeKeysStage, + NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; + NYql::NNodes::TDqStage CreateLookupExistStage(const NYql::NNodes::TDqStage& computeKeysStage, + const NYql::TKikimrTableDescription& table, NYql::TExprNode::TPtr _true, + NYql::TPositionHandle pos, NYql::TExprContext& ctx) const; + +private: + struct TUniqCheckNodes { + using TIndexId = int; + static constexpr TIndexId NOT_INDEX_ID = -1; + NYql::TExprNode::TPtr DictKeys; + NYql::TExprNode::TPtr UniqCmp; + TIndexId IndexId = NOT_INDEX_ID; + }; + + static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector, + const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, + const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx, + bool skipPkCheck); + + NYql::NNodes::TCoArgument RowsListArg; + TVector<TUniqCheckNodes> Checks; +}; + +} diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index 58de519dfdf..e89cb670444 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -1,5 +1,6 @@ #include "kqp_opt_phy_effects_rules.h" #include "kqp_opt_phy_effects_impl.h" +#include "kqp_opt_phy_uniq_helper.h" #include <ydb/library/yql/providers/common/provider/yql_provider.h> @@ -300,6 +301,100 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput .Done(); } +TMaybe<TCondenseInputResult> CheckUniqueConstraint(const TExprBase& inputRows, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx) +{ + auto condenseResult = CondenseInput(inputRows, ctx); + if (!condenseResult) { + return {}; + } + + TUniqBuildHelper helper(table, pos, ctx, true); + if (helper.GetChecksNum() == 0) { + return condenseResult; + } + + auto computeKeysStage = helper.CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx); + auto inputPrecompute = helper.CreateInputPrecompute(computeKeysStage, pos, ctx); + auto uniquePrecomputes = helper.CreateUniquePrecompute(computeKeysStage, pos, ctx); + + auto _true = MakeBool(pos, true, ctx); + + auto aggrStage = helper.CreateLookupExistStage(computeKeysStage, table, _true, pos, ctx); + + // Returns <bool>: <true> - no existing keys, <false> - at least one key exists + auto noExistingKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(aggrStage) + .Index().Build("0") + .Build() + .Build() + .Done(); + + TCoArgument inputRowList(ctx.NewArgument(pos, "rows_list")); + TCoArgument noExistingKeysArg(ctx.NewArgument(pos, "no_existing_keys")); + + struct TUniqueCheckNodes { + TUniqueCheckNodes(size_t sz) { + Bodies.reserve(sz); + Args.reserve(sz); + } + TVector<TExprNode::TPtr> Bodies; + TVector<TCoArgument> Args; + } uniqueCheckNodes(helper.GetChecksNum()); + + for (size_t i = 0; i < helper.GetChecksNum(); i++) { + uniqueCheckNodes.Args.emplace_back(ctx.NewArgument(pos, "are_keys_unique")); + uniqueCheckNodes.Bodies.emplace_back(Build<TKqpEnsure>(ctx, pos) + .Value(_true) + .Predicate(uniqueCheckNodes.Args.back()) + .IssueCode().Build(ToString((ui32) TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION)) + .Message(MakeMessage("Duplicated keys found.", pos, ctx)) + .Done().Ptr() + ); + } + + auto noExistingKeysCheck = Build<TKqpEnsure>(ctx, pos) + .Value(_true) + .Predicate(noExistingKeysArg) + .IssueCode().Build(ToString((ui32) TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION)) + .Message(MakeMessage("Conflict with existing key.", pos, ctx)) + .Done(); + + auto body = Build<TCoToStream>(ctx, pos) + .Input<TCoJust>() + .Input<TCoIfStrict>() + .Predicate<TCoAnd>() + .Add(uniqueCheckNodes.Bodies) + .Add(noExistingKeysCheck) + .Build() + .ThenValue(inputRowList) + .ElseValue<TCoList>() + .ListType(ExpandType(pos, *inputRows.Ref().GetTypeAnn(), ctx)) + .Build() + .Build() + .Build() + .Done(); + + TVector<NYql::NNodes::TCoArgument> stageArgs; + stageArgs.reserve(uniqueCheckNodes.Args.size() + 2); + stageArgs.emplace_back(inputRowList); + stageArgs.insert(stageArgs.end(), uniqueCheckNodes.Args.begin(), uniqueCheckNodes.Args.end()); + stageArgs.emplace_back(noExistingKeysArg); + + TVector<TExprBase> stageInputs; + stageInputs.reserve(uniquePrecomputes.size() + 2); + stageInputs.emplace_back(inputPrecompute); + stageInputs.insert(stageInputs.end(), uniquePrecomputes.begin(), uniquePrecomputes.end()); + stageInputs.emplace_back(noExistingKeysPrecompute); + + return TCondenseInputResult { + .Stream = body, + .StageInputs = std::move(stageInputs), + .StageArgs = std::move(stageArgs) + }; +} + } // namespace TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const TExprBase& inputRows, @@ -316,12 +411,14 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, const auto& pk = table.Metadata->KeyColumnNames; - auto condenseInputResult = CondenseAndDeduplicateInput(inputRows, table, ctx); - if (!condenseInputResult) { + auto checkedInput = CheckUniqueConstraint(inputRows, table, pos, ctx); + if (!checkedInput) { return {}; } - auto inputRowsAndKeys = PrecomputeRowsAndKeys(*condenseInputResult, table, pos, ctx); + auto condenseInputResult = DeduplicateInput(checkedInput.GetRef(), table, ctx); + + auto inputRowsAndKeys = PrecomputeRowsAndKeys(condenseInputResult, table, pos, ctx); THashSet<TStringBuf> inputColumnsSet; for (const auto& column : inputColumns) { diff --git a/ydb/core/kqp/opt/physical/effects/ya.make b/ydb/core/kqp/opt/physical/effects/ya.make index 69eceb85dad..d95dff71cc0 100644 --- a/ydb/core/kqp/opt/physical/effects/ya.make +++ b/ydb/core/kqp/opt/physical/effects/ya.make @@ -6,6 +6,7 @@ SRCS( kqp_opt_phy_indexes.cpp kqp_opt_phy_insert_index.cpp kqp_opt_phy_insert.cpp + kqp_opt_phy_uniq_helper.cpp kqp_opt_phy_update_index.cpp kqp_opt_phy_update.cpp kqp_opt_phy_upsert_index.cpp diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index fb97a0035f5..f7bb680a4d7 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -25,13 +25,14 @@ const NKikimrSchemeOp::EIndexType IG_UNIQUE = NKikimrSchemeOp::EIndexType::EInde NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query) { const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); return session.ExecuteDataQuery(query, txSettings, - TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync(); + TExecDataQuerySettings().KeepInQueryCache(true).CollectQueryStats(ECollectQueryStatsMode::Basic)).ExtractValueSync(); } NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString& query, TParams& params) { const auto txSettings = TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(); + return session.ExecuteDataQuery(query, txSettings, params, - TExecDataQuerySettings().KeepInQueryCache(true)).ExtractValueSync(); + TExecDataQuerySettings().KeepInQueryCache(true).CollectQueryStats(ECollectQueryStatsMode::Basic)).ExtractValueSync(); } void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type) { @@ -186,6 +187,68 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { } } + Y_UNIT_TEST(ReplaceFkAlreadyExist) { + TKikimrRunner kikimr(SyntaxV1Settings()); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + FillTable(session); + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (2, 1000000000, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (2, 1000000000, "v1"), + (2, 1000000001, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(UpdateOnFkAlreadyExist) { + TKikimrRunner kikimr(SyntaxV1Settings()); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + FillTable(session); + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES + (2, 1000000000, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES + (2, 1000000000, "v1"), + (2, 1000000001, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(InsertFkPkOverlap) { TKikimrRunner kikimr(SyntaxV1Settings()); CreateTableWithMultishardIndexComplexFkPk(kikimr.GetTestClient(), IG_UNIQUE); @@ -204,7 +267,6 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { } } - Y_UNIT_TEST(InsertNullInPk) { TKikimrRunner kikimr(SyntaxV1Settings()); CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); @@ -420,6 +482,65 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { } } + + Y_UNIT_TEST(ReplaceFkDuplicate) { + TKikimrRunner kikimr(SyntaxV1Settings()); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + FillTable(session); + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (1173915, 1230000001, "v1"), + (1173916, 1230000001, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (1173915, 1230000001, "v1"), + (1173916, 1230000002, "v1"), + (1173915, 1230000001, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (1173915, NULL, "v1"), + (1173916, NULL, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + REPLACE INTO `/Root/MultiShardIndexed` (key, fk, value) VALUES + (1173917, NULL, "v1"), + (1173917, NULL, "v1"); + )")); + + auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + const TString expected = R"([[#;[1173915u]];[#;[1173916u]];[#;[1173917u]];[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } } Y_UNIT_TEST_SUITE(KqpMultishardIndex) { |