diff options
6 files changed, 428 insertions, 56 deletions
diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp index a8b3560f72e..54d2e81e059 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects.cpp @@ -163,16 +163,25 @@ TMaybe<TCondenseInputResult> CondenseInputToDictByPk(const TExprBase& input, con }; } -TCoLambda MakeTableKeySelector(const TKikimrTableMetadataPtr meta, TPositionHandle pos, TExprContext& ctx) { +TCoLambda MakeTableKeySelector(const TKikimrTableMetadataPtr meta, TPositionHandle pos, TExprContext& ctx, TMaybe<int> tupleId) { auto keySelectorArg = TCoArgument(ctx.NewArgument(pos, "key_selector")); TVector<TExprBase> keyTuples; keyTuples.reserve(meta->KeyColumnNames.size()); + + TExprBase selector = keySelectorArg; + if (tupleId) { + selector = Build<TCoNth>(ctx, pos) + .Tuple(keySelectorArg) + .Index().Build(*tupleId) + .Done(); + } + for (const auto& key : meta->KeyColumnNames) { auto tuple = Build<TCoNameValueTuple>(ctx, pos) .Name().Build(key) .Value<TCoMember>() - .Struct(keySelectorArg) + .Struct(selector) .Name().Build(key) .Build() .Done(); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index 2336afed97e..27ec92f90e4 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -37,7 +37,7 @@ NYql::NNodes::TMaybeNode<NYql::NNodes::TDqPhyPrecompute> PrecomputeTableLookupDi // Creates key selector using PK of given table NYql::NNodes::TCoLambda MakeTableKeySelector(const NYql::TKikimrTableMetadataPtr tableMeta, NYql::TPositionHandle pos, - NYql::TExprContext& ctx); + NYql::TExprContext& ctx, TMaybe<int> tupleId = {}); // Creates key selector using user provided index columns. // It is important to note. This function looks at the _user_prvided_ set of columns. @@ -56,6 +56,10 @@ NYql::NNodes::TCoLambda MakeRowsPayloadSelector(const NYql::NNodes::TCoAtomList& NYql::NNodes::TExprBase MakeRowsFromDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector<TString>& dictKeys, const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); +// Same as MakeRowsFromDict but skip rows which marked as non changed (true in second tuple) +NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector<TString>& dictKeys, + const THashSet<TStringBuf>& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + NYql::NNodes::TMaybeNode<NYql::NNodes::TDqCnUnionAll> MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input, const NYql::TKikimrTableDescription& table, bool abortOnError, NYql::TPositionHandle pos, NYql::TExprContext& ctx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp index 1e196ff7e2f..c9232fd7f26 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp @@ -228,4 +228,80 @@ TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector<TString>& .Done(); } +TExprBase MakeRowsFromTupleDict(const TDqPhyPrecompute& dict, const TVector<TString>& dictKeys, + const THashSet<TStringBuf>& columns, TPositionHandle pos, TExprContext& ctx) +{ + THashSet<TString> dictKeysSet(dictKeys.begin(), dictKeys.end()); + auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple")); + + TVector<TExprBase> rowTuples; + for (const auto& column : columns) { + auto columnAtom = ctx.NewAtom(pos, column); + auto tupleIndex = dictKeysSet.contains(column) + ? 0 // Key + : 1; // Payload + + auto extractor = Build<TCoNth>(ctx, pos) + .Tuple(dictTupleArg) + .Index().Build(tupleIndex) + .Done().Ptr(); + + if (tupleIndex) { + extractor = Build<TCoNth>(ctx, pos) + .Tuple(extractor) + .Index().Build(0) + .Done().Ptr(); + } + + auto tuple = Build<TCoNameValueTuple>(ctx, pos) + .Name(columnAtom) + .Value<TCoMember>() + .Struct(extractor) + .Name(columnAtom) + .Build() + .Done(); + + rowTuples.emplace_back(std::move(tuple)); + } + + auto computeRowsStage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(dict) + .Build() + .Program() + .Args({"dict"}) + .Body<TCoIterator>() + .List<TCoFlatMap>() + .Input<TCoDictItems>() + .Dict("dict") + .Build() + .Lambda() + .Args(dictTupleArg) + .Body<TCoOptionalIf>() + .Predicate<TCoNth>() + .Tuple<TCoNth>() + .Tuple(dictTupleArg) + .Index().Build(1) + .Build() + .Index().Build(1) + .Build() + .Value<TCoAsStruct>() + .Add(rowTuples) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + return Build<TDqCnUnionAll>(ctx, pos) + .Output() + .Stage(computeRowsStage) + .Index().Build("0") + .Build() + .Done(); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index 30f7eb4ff1b..4cc6725c3c0 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -92,16 +92,13 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes // Return set of data columns need to be save during index update THashSet<TString> CreateDataColumnSetToRead( - const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>>& indexes, - const THashSet<TStringBuf>& inputColumns) + const TVector<std::pair<TExprNode::TPtr, const TIndexDescription*>>& indexes) { THashSet<TString> res; for (const auto& index : indexes) { for (const auto& col : index.second->DataColumns) { - if (!inputColumns.contains(col)) { - res.emplace(col); - } + res.emplace(col); } } @@ -183,7 +180,7 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput TExprContext& ctx) { // Check if we can update index table from just input data - bool allColumnFromInput = true; + bool allColumnFromInput = true; // - indicate all data from input for (const auto& column : indexColumns) { allColumnFromInput = allColumnFromInput && inputColumns.contains(column); } @@ -257,7 +254,10 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput Build<TCoNameValueTuple>(ctx, pos) .Name(columnAtom) .Value<TCoMember>() - .Struct(payload) + .Struct<TCoNth>() + .Tuple(payload) + .Index().Build(0) + .Build() .Name(columnAtom) .Build() .Done() @@ -265,13 +265,21 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput } } + auto presentKeyRowStruct = Build<TCoAsStruct>(ctx, pos) + .Add(presentKeyRow) + .Done(); + TExprBase flatmapBody = Build<TCoIfPresent>(ctx, pos) .Optional(lookup) .PresentHandler<TCoLambda>() .Args(payload) - .Body<TCoJust>() - .Input<TCoAsStruct>() - .Add(presentKeyRow) + .Body<TCoFlatOptionalIf>() + .Predicate<TCoNth>() + .Tuple(payload) + .Index().Build(1) + .Build() + .Value<TCoJust>() + .Input(presentKeyRowStruct) .Build() .Build() .Build() @@ -478,7 +486,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, // For UPSERT check that indexes is not empty YQL_ENSURE(mode == TKqpPhyUpsertIndexMode::UpdateOn || indexes); - THashSet<TString> indexDataColumns = CreateDataColumnSetToRead(indexes, inputColumnsSet); + THashSet<TString> indexDataColumns = CreateDataColumnSetToRead(indexes); THashSet<TString> indexKeyColumns = CreateKeyColumnSetToRead(indexes); auto lookupDict = PrecomputeTableLookupDict(inputRowsAndKeys.KeysPrecompute, table, indexDataColumns, indexKeyColumns, pos, ctx); @@ -520,17 +528,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, indexTableColumns.insert(column); } - if (indexKeyColumnsUpdated) { - // Have to delete old index value from index table in case when index key columns were updated - auto deleteIndexKeys = MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumns, pos, ctx); - - auto indexDelete = Build<TKqlDeleteRows>(ctx, pos) - .Table(tableNode) - .Input(deleteIndexKeys) - .Done(); - - effects.emplace_back(indexDelete); - } + auto indexTableColumnsWithoutData = indexTableColumns; bool indexDataColumnsUpdated = false; for (const auto& column : indexDesc->DataColumns) { @@ -542,13 +540,178 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, } } + // Need to calc is the updated row in index same + TVector<TExprBase> payloadTuples; + TVector<TExprBase> keyTuples; + auto payloadSelectorArg = TCoArgument(ctx.NewArgument(pos, "payload_selector_row_for_index")); + auto payload_table_row = TCoArgument(ctx.NewArgument(pos, "payload_table_row")); + + TVector<TExprBase> inputRowsForIndex; + inputRowsForIndex.reserve(indexTableColumns.size()); + + TVector<TExprBase> lookupRow; + lookupRow.reserve(indexTableColumns.size()); + + auto inputItem = TCoArgument(ctx.NewArgument(pos, "input_item_" + indexDesc->Name)); + for (const auto& column : indexTableColumns) { + if (table.GetKeyColumnIndex(TString(column))) { + continue; + } + auto columnAtom = ctx.NewAtom(pos, column); + + if (inputColumnsSet.contains(column)) { + inputRowsForIndex.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name(columnAtom) + .Value<TCoMember>() + .Struct(inputItem) + .Name(columnAtom) + .Build() + .Done()); + + lookupRow.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name(columnAtom) + .Value<TCoMember>() + .Struct(payload_table_row) + .Name(columnAtom) + .Build() + .Done()); + } + + payloadTuples.emplace_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name(columnAtom) + .Value<TCoMember>() + .Struct<TCoNth>() + .Tuple(payloadSelectorArg) + .Index().Build(1) + .Build() + .Name(columnAtom) + .Build() + .Done()); + } + + auto inputArg = TCoArgument(ctx.NewArgument(pos, "recalc_input_arg_" + indexDesc->Name)); + + auto cmp = ctx.Builder(pos) + .Callable("AggrNotEquals") + .Add(0, Build<TCoAsStruct>(ctx, pos) + .Add(inputRowsForIndex) + .Done().Ptr()) + .Add(1, Build<TCoAsStruct>(ctx, pos) + .Add(lookupRow) + .Done().Ptr()) + .Seal().Build(); + + for (const auto& key : pk) { + auto tuple = Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(key) + .Value<TCoMember>() + .Struct(inputItem) + .Name().Build(key) + .Build() + .Done(); + + keyTuples.emplace_back(tuple); + } + + auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "recalc_dict_arg_" + indexDesc->Name)); + auto reComputeDictStage = Build<TDqStage>(ctx, pos) + .Inputs() + .Add(inputRowsAndKeys.RowsPrecompute) // input rows + .Add(lookupDict.Cast()) // dict contains loockuped from table rows + .Build() + .Program() + .Args({inputArg, lookupDictArg}) + .Body<TCoIterator>() + .List<TCoMap>() + .Input<TCoToList>() + .Optional<TCoJust>() + .Input(lookupDictArg) + .Build() + .Build() + .Lambda() + .Args({"collection"}) + .Body<TCoToDict>() + .List<TCoFlatMap>() + .Input(inputArg) + .Lambda() + .Args({inputItem}) + .Body<TCoMap>() + .Input<TCoLookup>() + .Collection("collection") + .Lookup<TCoAsStruct>() + .Add(keyTuples) + .Build() + .Build() + .Lambda<TCoLambda>() + .Args({payload_table_row}) + .Body<TExprList>() // Key of tuple - key columns of index + .Add<TCoAsStruct>() + .Add(keyTuples) + .Build() + .Add(payload_table_row) // rows read from main table + .Add(cmp) // comparation on rows from input and from index. true if not equal + .Build() + .Build() + .Build() + .Build() + .Build() + .KeySelector(MakeTableKeySelector(table.Metadata, pos, ctx, 0)) + .PayloadSelector<TCoLambda>() + .Args({payloadSelectorArg}) + .Body<TExprList>() + .Add<TCoNth>() + .Tuple(payloadSelectorArg) + .Index().Build(1) + .Build() + .Add<TCoNth>() + .Tuple(payloadSelectorArg) + .Index().Build(2) + .Build() + .Build() + .Build() + .Settings() + .Add().Build("One") + .Add().Build("Hashed") + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Settings().Build() + .Done(); + + auto lookupDictRecomputed = Build<TDqPhyPrecompute>(ctx, pos) + .Connection<TDqCnValue>() + .Output() + .Stage(reComputeDictStage) + .Index().Build("0") + .Build() + .Build() + .Done(); + + if (indexKeyColumnsUpdated) { + // Have to delete old index value from index table in case when index key columns were updated + auto deleteIndexKeys = MakeRowsFromTupleDict(lookupDictRecomputed, pk, indexTableColumnsWithoutData, pos, ctx); + + auto indexDelete = Build<TKqlDeleteRows>(ctx, pos) + .Table(tableNode) + .Input(deleteIndexKeys) + .Done(); + + effects.emplace_back(indexDelete); + } + // Index update always required for UPSERT operations as they can introduce new table rows bool needIndexTableUpdate = mode != TKqpPhyUpsertIndexMode::UpdateOn; // Index table update required in case when index key or data columns were updated needIndexTableUpdate = needIndexTableUpdate || indexKeyColumnsUpdated || indexDataColumnsUpdated; if (needIndexTableUpdate) { - auto upsertIndexRows = MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(), + auto upsertIndexRows = MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed, inputColumnsSet, indexTableColumns, table, pos, ctx); auto indexUpsert = Build<TKqlUpsertRows>(ctx, pos) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 7430cc5bd63..4b9ad3cf363 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -593,6 +593,122 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL_C(result2.GetStatus(), NYdb::EStatus::ABORTED, result2.GetIssues().ToString().c_str()); } + void DoUpsertWithoutIndexUpdate(bool uniq) { + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetKqpSettings({setting}); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto tableBuilder = db.GetTableBuilder(); + tableBuilder + .AddNullableColumn("Key", EPrimitiveType::String) + .AddNullableColumn("fk1", EPrimitiveType::String) + .AddNullableColumn("fk2", EPrimitiveType::Int32) + .AddNullableColumn("fk3", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String); + tableBuilder.SetPrimaryKeyColumns(TVector<TString>{"Key"}); + if (uniq) { + tableBuilder.AddUniqueSecondaryIndex("Index", TVector<TString>{"fk1", "fk2", "fk3"}); + } else { + tableBuilder.AddSecondaryIndex("Index", TVector<TString>{"fk1", "fk2", "fk3"}); + } + auto result = session.CreateTable("/Root/TestTable", tableBuilder.Build()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + { + // Upsert - add new row + const TString query2 = Q1_(R"( + UPSERT INTO `/Root/TestTable` (Key, fk1, fk3, Value) VALUES + ("Primary1", "fk1_str", 1000000000u, "Value1_1"); + )"); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + auto result = session.ExecuteDataQuery( + query2, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + execSettings) + .ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); + + // One read from main table + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 0); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 0); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access().size(), 2); + + // One update of main table + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(0).has_deletes()); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(1).has_deletes()); + + { + const auto& yson = ReadTablePartToYson(session, "/Root/TestTable/Index/indexImplTable"); + const TString expected = R"([[["fk1_str"];#;[1000000000u];["Primary1"]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + + { + // Same query - should not touch index + const TString query2 = Q1_(R"( + UPSERT INTO `/Root/TestTable` (Key, fk1, fk3, Value) VALUES + ("Primary1", "fk1_str", 1000000000u, "Value1_1"); + )"); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + auto result = session.ExecuteDataQuery( + query2, + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + execSettings) + .ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); + + // One read from main table + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 0); + + // One update of main table + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(0).has_deletes()); + + { + const auto& yson = ReadTablePartToYson(session, "/Root/TestTable/Index/indexImplTable"); + const TString expected = R"([[["fk1_str"];#;[1000000000u];["Primary1"]]])"; + UNIT_ASSERT_VALUES_EQUAL(yson, expected); + } + } + } + + Y_UNIT_TEST_TWIN(DoUpsertWithoutIndexUpdate, UniqIndex) { + if (UniqIndex) + return; // TODO: fixit!!! + DoUpsertWithoutIndexUpdate(UniqIndex); + } + Y_UNIT_TEST(UpsertWithoutExtraNullDelete) { auto setting = NKikimrKqp::TKqpSetting(); auto serverSettings = TKikimrSettings() @@ -637,18 +753,22 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access(0).updates().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access().size(), 2); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(0).has_deletes()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access(1).updates().rows(), 1); - UNIT_ASSERT(!stats.query_phases(3).table_access(0).has_deletes()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(1).has_deletes()); } { @@ -666,23 +786,26 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { .ExtractValueSync(); UNIT_ASSERT(result.IsSuccess()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TestTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); - int idx = 3; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access().size(), 2); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(0).has_deletes()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(0).updates().rows(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(1).updates().rows(), 1); - UNIT_ASSERT(stats.query_phases(idx).table_access(1).has_deletes()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(1).deletes().rows(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).updates().rows(), 1); + UNIT_ASSERT( stats.query_phases(4).table_access(1).has_deletes()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(1).deletes().rows(), 1); } { @@ -701,22 +824,21 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { .ExtractValueSync(); UNIT_ASSERT(result.IsSuccess()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); // One read from main table UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TestTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 1); - int idx = 3; - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(3).table_access().size(), 0); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access().size(), 1); // One update of main table - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(0).name(), "/Root/TestTable"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(0).updates().rows(), 1); - - // No touching index - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(1).name(), "/Root/TestTable/Index/indexImplTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).name(), "/Root/TestTable"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(4).table_access(0).updates().rows(), 1); + UNIT_ASSERT( !stats.query_phases(4).table_access(0).has_deletes()); { const auto& yson = ReadTablePartToYson(session, "/Root/TestTable/Index/indexImplTable"); @@ -757,7 +879,6 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(idx).table_access(0).updates().rows(), 1); // Thats it, no phase for index table - we remove it on compile time - { const auto& yson = ReadTablePartToYson(session, "/Root/TestTable/Index/indexImplTable"); const TString expected = R"([[["Secondary1_1"];["Primary1"]]])"; @@ -799,7 +920,6 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL(yson, expected); } } - } Y_UNIT_TEST(UpsertWithNullKeysSimple) { @@ -4699,7 +4819,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda auto reads = table["reads"].GetArraySafe(); UNIT_ASSERT_VALUES_EQUAL(reads.size(), 1); UNIT_ASSERT_VALUES_EQUAL(reads[0]["type"], "Lookup"); - UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3); } { // Check that data colomns not from involved index aren't in read columns diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 4be235b4b45..dfcfb122669 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -581,7 +581,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } Y_UNIT_TEST(IndexReplace) { @@ -604,7 +604,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } Y_UNIT_TEST(IndexUpdateOn) { @@ -627,7 +627,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } Y_UNIT_TEST(IndexDeleteOn) { |
