diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2023-10-23 13:55:36 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-10-23 14:40:44 +0300 |
commit | 254b5cf39e7920253a249b7b97f09a87b6d20ca9 (patch) | |
tree | 954d03a62666ba0c65f27aa9269dbeff2c44f728 | |
parent | a93ff99ad5b0c8afb235dae7f963c4f6c46706f7 (diff) | |
download | ydb-254b5cf39e7920253a249b7b97f09a87b6d20ca9.tar.gz |
Fix update in case of uniq constraint. We must allow to update value …
Fix update in case of uniq constraint. We must allow to update value …
Pull Request resolved: https://github.com/ydb-platform/ydb/pull/403
3 files changed, 321 insertions, 58 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp index a115df7bb4d..dc17cc56519 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.cpp @@ -18,46 +18,111 @@ struct TLookupNodes { TVector<TCoArgument> Args; }; -TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t index, - const NYql::TKikimrTableMetadata& meta, TExprNode::TPtr _false, - TPositionHandle pos, TExprContext& ctx) +TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, size_t stageOut, + const NYql::TKikimrTableMetadata& mainTableMeta, int indexId, TExprNode::TPtr _false, + std::pair<TExprNode::TPtr, size_t> pkChecks, TPositionHandle pos, TExprContext& ctx) { - auto lookupKeysPrecompute = Build<TDqPhyPrecompute>(ctx, pos) + const NYql::TKikimrTableMetadata* meta; + if (indexId == -1) { + pkChecks.first.Reset(); + meta = &mainTableMeta; + } else { + YQL_ENSURE((size_t)indexId < mainTableMeta.SecondaryGlobalIndexMetadata.size()); + meta = mainTableMeta.SecondaryGlobalIndexMetadata[indexId].Get(); + } + + TVector<TExprBase> inputs; + TVector<TCoArgument> args; + + inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) .Connection<TDqCnValue>() .Output() .Stage(computeKeysStage) - .Index().Build(IntToString<10>(index)) + .Index().Build(IntToString<10>(stageOut)) .Build() .Build() - .Done(); + .Done() + ); + + args.emplace_back( + Build<TCoArgument>(ctx, pos) + .Name(TString("arg0")) + .Done() + ); + + NYql::TExprNode::TPtr lambda; + TVector<TExprBase> columnsToSelect; + + if (pkChecks.first) { + columnsToSelect.reserve(mainTableMeta.KeyColumnNames.size()); + + inputs.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(computeKeysStage) + .Index().Build(IntToString<10>(pkChecks.second)) + .Build() + .Build() + .Done() + ); + args.emplace_back(Build<TCoArgument>(ctx, pos) + .Name(TString("arg1")) + .Done() + ); + + for (const auto& key : mainTableMeta.KeyColumnNames) { + columnsToSelect.emplace_back(Build<TCoAtom>(ctx, pos) + .Value(key) + .Done() + ); + } + + lambda = Build<TCoLambda>(ctx, pos) + .Args({"row_from_index"}) + .Body<TCoOptionalIf>() + .Predicate<TCoNot>() + .Value<TCoContains>() + .Collection(args[1]) + .Lookup("row_from_index") + .Build() + .Build() + .Value(_false) + .Build() + .Done() + .Ptr(); + } else { + lambda = Build<TCoLambda>(ctx, pos) + .Args({"row"}) + .Body<TCoJust>() + .Input(_false) + .Build() + .Done() + .Ptr(); + } auto stage = Build<TDqStage>(ctx, pos) .Inputs() - .Add(lookupKeysPrecompute) + .Add(inputs) .Build() .Program() - .Args({"keys_list"}) - .Body<TCoMap>() + .Args(args) + .Body<TCoFlatMap>() .Input<TCoTake>() .Input<TKqpLookupTable>() - .Table(BuildTableMeta(meta, pos, ctx)) + .Table(BuildTableMeta(*meta, pos, ctx)) .LookupKeys<TCoIterator>() - .List("keys_list") + .List(args[0]) .Build() - .Columns() + .Columns<TCoAtomList>() + .Add(columnsToSelect) .Build() .Build() .Count<TCoUint64>() .Literal().Build("1") .Build() .Build() - .Lambda() - .Args({"row"}) - .Body<TCoJust>() - .Input(_false) - .Build() - .Build() + .Lambda(lambda) .Build() .Build() .Settings().Build() @@ -71,16 +136,39 @@ TDqCnUnionAll CreateLookupStageWithConnection(const TDqStage& computeKeysStage, .Done(); } +NYql::TExprNode::TPtr MakeUniqCheckDict(const TCoLambda& selector, + const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) +{ + return Build<TCoToDict>(ctx, pos) + .List(rowsListArg) + .KeySelector(selector) + .PayloadSelector() + .Args({"stub"}) + .Body<TCoVoid>() + .Build() + .Build() + .Settings() + .Add().Build("One") + .Add().Build("Hashed") + .Build() + .Done().Ptr(); +} + } -TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, +std::pair<TVector<TUniqBuildHelper::TUniqCheckNodes>, NYql::TExprNode::TPtr> TUniqBuildHelper::Prepare(const TCoArgument& rowsListArg, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, bool skipPkCheck) { TVector<TUniqCheckNodes> checks; + NYql::TExprNode::TPtr pkDict; if (!skipPkCheck) { checks.emplace_back(MakeUniqCheckNodes(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx)); + } else { + // In case of update we must make additional filtering to exclude duplicate checks + // non pk constraint for rows where pk was given + pkDict = MakeUniqCheckDict(MakeTableKeySelector(table.Metadata, pos, ctx), rowsListArg, pos, ctx); } // make uniq check for each uniq constraint @@ -111,7 +199,7 @@ TVector<TUniqBuildHelper::TUniqCheckNodes> TUniqBuildHelper::Prepare(const TCoAr checks.back().IndexId = i; } - return checks; + return {checks, pkDict}; } TUniqBuildHelper::TUniqBuildHelper(const TKikimrTableDescription& table, @@ -124,19 +212,8 @@ TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCo const TExprBase& rowsListArg, TPositionHandle pos, TExprContext& ctx) { TUniqCheckNodes result; - auto dict = Build<TCoToDict>(ctx, pos) - .List(rowsListArg) - .KeySelector(selector) - .PayloadSelector() - .Args({"stub"}) - .Body<TCoVoid>() - .Build() - .Build() - .Settings() - .Add().Build("One") - .Add().Build("Hashed") - .Build() - .Done().Ptr(); + + auto dict = MakeUniqCheckDict(selector, rowsListArg, pos, ctx); result.DictKeys = Build<TCoDictKeys>(ctx, pos) .Dict(dict) @@ -155,14 +232,18 @@ TUniqBuildHelper::TUniqCheckNodes TUniqBuildHelper::MakeUniqCheckNodes(const TCo } size_t TUniqBuildHelper::GetChecksNum() const { - return Checks.size(); + return Checks.Size(); +} + +size_t TUniqBuildHelper::CalcComputeKeysStageOutputNum() const { + return Checks.Size() * 2 + 1; } TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& condenseResult, TPositionHandle pos, TExprContext& ctx) const { // Number of items for output list 2 for each table + 1 for params itself - const size_t nItems = Checks.size() * 2 + 1; + const size_t nItems = CalcComputeKeysStageOutputNum(); TVector<TExprBase> types; types.reserve(nItems); @@ -172,7 +253,7 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co .Done() ); - for (size_t i = 0; i < Checks.size(); i++) { + for (size_t i = 0; i < Checks.Size(); i++) { types.emplace_back( Build<TCoTypeOf>(ctx, pos) .Value(Checks[i].DictKeys) @@ -185,6 +266,14 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co ); } + if (auto dict = Checks.GetPkDict()) { + types.emplace_back( + Build<TCoTypeOf>(ctx, pos) + .Value(dict) + .Done() + ); + } + auto variantType = Build<TCoVariantType>(ctx, pos) .UnderlyingType<TCoTupleType>() .Add(types) @@ -202,7 +291,8 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co .Done() ); - for (size_t i = 0, ch = 1; i < Checks.size(); i++) { + size_t ch = 1; + for (size_t i = 0; i < Checks.Size(); i++) { variants.emplace_back( Build<TCoVariant>(ctx, pos) .Item(Checks[i].DictKeys) @@ -219,6 +309,16 @@ TDqStage TUniqBuildHelper::CreateComputeKeysStage(const TCondenseInputResult& co ); } + if (auto dict = Checks.GetPkDict()) { + variants.emplace_back( + Build<TCoVariant>(ctx, pos) + .Item(dict) + .Index().Build(IntToString<10>(ch++)) + .VarType(variantType) + .Done() + ); + } + return Build<TDqStage>(ctx, pos) .Inputs() .Add(condenseResult.StageInputs) @@ -256,8 +356,8 @@ TVector<TExprBase> TUniqBuildHelper::CreateUniquePrecompute(const TDqStage& comp TPositionHandle pos, TExprContext& ctx) const { TVector<TExprBase> uniquePrecomputes; - uniquePrecomputes.reserve(Checks.size()); - for (size_t i = 0, output_index = 2; i < Checks.size(); i++, output_index += 2) { + uniquePrecomputes.reserve(Checks.Size()); + for (size_t i = 0, output_index = 2; i < Checks.Size(); i++, output_index += 2) { uniquePrecomputes.emplace_back(Build<TDqPhyPrecompute>(ctx, pos) .Connection<TDqCnValue>() .Output() @@ -274,25 +374,21 @@ TVector<TExprBase> TUniqBuildHelper::CreateUniquePrecompute(const TDqStage& comp TDqStage TUniqBuildHelper::CreateLookupExistStage(const TDqStage& computeKeysStage, const TKikimrTableDescription& table, TExprNode::TPtr _true, TPositionHandle pos, TExprContext& ctx) const { - TLookupNodes lookupNodes(Checks.size()); + TLookupNodes lookupNodes(Checks.Size()); auto _false = MakeBool(pos, false, ctx); + // last stage output is pk dict for update mode + int pkDictOutputId = Checks.GetPkDict() ? CalcComputeKeysStageOutputNum() : -1; + // 0 output - input stream itself so start with output 1 // Each check produces 2 outputs - for (size_t i = 0, stage_out = 1; i < Checks.size(); i++, stage_out += 2) { + for (size_t i = 0, stage_out = 1; i < Checks.Size(); i++, stage_out += 2) { const auto indexId = Checks[i].IndexId; - if (indexId == TUniqCheckNodes::NOT_INDEX_ID) { - lookupNodes.Stages.emplace_back( - CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, _false, pos, ctx) - ); - } else { - YQL_ENSURE((size_t)indexId < table.Metadata->SecondaryGlobalIndexMetadata.size()); - lookupNodes.Stages.emplace_back( - CreateLookupStageWithConnection(computeKeysStage, stage_out, - *(table.Metadata->SecondaryGlobalIndexMetadata[indexId]), _false, pos, ctx) - ); - } + lookupNodes.Stages.emplace_back( + CreateLookupStageWithConnection(computeKeysStage, stage_out, *table.Metadata, indexId, + _false, {Checks.GetPkDict(), pkDictOutputId}, pos, ctx) + ); lookupNodes.Args.emplace_back( Build<TCoArgument>(ctx, pos) diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h index b580fdd8b5b..a188b4afe53 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_uniq_helper.h @@ -11,6 +11,8 @@ namespace NKikimr::NKqp::NOpt { class TUniqBuildHelper { public: + // table - metadata of table + // skipPkCheck - false for insert mode, generate check on PK to issue an arror on PK conflict TUniqBuildHelper(const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool skipPkCheck); size_t GetChecksNum() const; @@ -34,14 +36,39 @@ private: TIndexId IndexId = NOT_INDEX_ID; }; + class TChecks { + public: + TChecks(std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr>&& pair) + : Checks(std::move(pair.first)) + , PkDict(pair.second) + {} + + size_t Size() const { + return Checks.size(); + } + + const TUniqCheckNodes& operator [](size_t i) const { + return Checks[i]; + } + + const NYql::TExprNode::TPtr GetPkDict() const { + return PkDict; + } + private: + const TVector<TUniqCheckNodes> Checks; + const NYql::TExprNode::TPtr PkDict; + }; + + size_t CalcComputeKeysStageOutputNum() const; + static TUniqCheckNodes MakeUniqCheckNodes(const NYql::NNodes::TCoLambda& selector, const NYql::NNodes::TExprBase& rowsListArg, NYql::TPositionHandle pos, NYql::TExprContext& ctx); - static TVector<TUniqCheckNodes> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, + static std::pair<TVector<TUniqCheckNodes>, NYql::TExprNode::TPtr> Prepare(const NYql::NNodes::TCoArgument& rowsListArg, const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx, bool skipPkCheck); NYql::NNodes::TCoArgument RowsListArg; - TVector<TUniqCheckNodes> Checks; + const TChecks Checks; }; } diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index 6b1534ac602..32eab8a75e4 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -35,7 +35,7 @@ NYdb::NTable::TDataQueryResult ExecuteDataQuery(TSession& session, const TString TExecDataQuerySettings().KeepInQueryCache(true).CollectQueryStats(ECollectQueryStatsMode::Basic)).ExtractValueSync(); } -void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type) { +void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIndexType type, bool dataColumn = false) { const TString scheme = R"(Name: "MultiShardIndexed" Columns { Name: "key" Type: "Uint64" } Columns { Name: "fk" Type: "Uint32" } @@ -49,7 +49,10 @@ void CreateTableWithMultishardIndex(Tests::TClient& client, NKikimrSchemeOp::EIn bool parseOk = ::google::protobuf::TextFormat::ParseFromString(scheme, &desc); UNIT_ASSERT(parseOk); - auto status = client.TClient::CreateTableWithUniformShardedIndex("/Root", desc, "index", {"fk"}, type); + TVector<TString> dataColumns; + if (dataColumn) + dataColumns.emplace_back("value"); + auto status = client.TClient::CreateTableWithUniformShardedIndex("/Root", desc, "index", {"fk"}, type, dataColumns); UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK); } @@ -247,6 +250,16 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { { const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` SET fk = 1000000000; + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( UPDATE `/Root/MultiShardIndexed` SET fk = 1000000000 WHERE value = "v2"; )")); @@ -270,9 +283,26 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { const TString expected = R"([[[1000000000u];[1u]];[[1000000001u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; UNIT_ASSERT_VALUES_EQUAL(yson, expected); } +return; + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` SET fk = 1000000000 WHERE value = "v1"; + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + const TString expected = R"([[[1000000000u];[1u]];[[1000000001u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } - Y_UNIT_TEST(UpdateOnFkSelectResultAlreadyExist) { + Y_UNIT_TEST(UpdateFkSameValue) { TKikimrRunner kikimr(SyntaxV1Settings()); CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); auto db = kikimr.GetTableClient(); @@ -281,14 +311,124 @@ Y_UNIT_TEST_SUITE(KqpUniqueIndex) { { const TString query(Q_(R"( - UPDATE `/Root/MultiShardIndexed` ON + UPDATE `/Root/MultiShardIndexed` SET fk = 1000000000 WHERE key = 1; + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + const TString expected = R"([[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` SET fk = 1000000000 WHERE value = "v1"; + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + const TString expected = R"([[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + + Y_UNIT_TEST(UpdateOnFkSelectResultSameValue) { + TKikimrRunner kikimr(SyntaxV1Settings()); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + FillTable(session); + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON SELECT * FROM `/Root/MultiShardIndexed` WHERE key = 2; )")); auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON + SELECT * FROM `/Root/MultiShardIndexed`; + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + const TString expected = R"([[[1000000000u];[1u]];[[2000000000u];[2u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + + void UpdateOnHidenChanges(bool dataColumn) { + TKikimrRunner kikimr(SyntaxV1Settings()); + CreateTableWithMultishardIndex(kikimr.GetTestClient(), IG_UNIQUE, dataColumn); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + FillTable(session); + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES + (2, 1000000000, "mod_2"), + (1, 1000000000, "mod_1"); + )")); + + auto result = ExecuteDataQuery(session, query); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); } + + { + const TString query(Q_(R"( + UPDATE `/Root/MultiShardIndexed` ON (key, fk, value) VALUES + (2, 1000000000, "mod_22"), + (1, 1000000001, "mod_11"); + )")); + + auto result = ExecuteDataQuery(session, query); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed/index/indexImplTable"); + if (dataColumn) { + const TString expected = R"([[[1000000000u];[2u];["mod_22"]];[[1000000001u];[1u];["mod_11"]];[[3000000000u];[3u];["v3"]];[[4294967295u];[4u];["v4"]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } else { + const TString expected = R"([[[1000000000u];[2u]];[[1000000001u];[1u]];[[3000000000u];[3u]];[[4294967295u];[4u]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + + { + const auto yson = ReadTableToYson(session, "/Root/MultiShardIndexed"); + const TString expected = R"([[[1u];[1000000001u];["mod_11"]];[[2u];[1000000000u];["mod_22"]];[[3u];[3000000000u];["v3"]];[[4u];[4294967295u];["v4"]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + + Y_UNIT_TEST_TWIN(UpdateOnHidenChanges, DataColumn) { + UpdateOnHidenChanges(DataColumn); } Y_UNIT_TEST(UpdateOnFkAlreadyExist) { |