diff options
author | vpolka <vpolka@yandex-team.com> | 2023-10-13 12:45:15 +0300 |
---|---|---|
committer | vpolka <vpolka@yandex-team.com> | 2023-10-13 13:06:39 +0300 |
commit | ecebb418cadc3c4481096b9faaa7d7775a96c572 (patch) | |
tree | 31bb9aa0ead55cb167fc4c9bf4a990232775251b | |
parent | 24eb86b163be9a818f095e0d99a4887fd8f6eec0 (diff) | |
download | ydb-ecebb418cadc3c4481096b9faaa7d7775a96c572.tar.gz |
KIKIMR-15250: can push down top sort if not pk prefix columns in input exist in index data columns
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 145 |
3 files changed, 241 insertions, 8 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index 6ba7a8e7d8..acb6b955e9 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -58,8 +58,8 @@ TCoAtomList MergeColumns(const NNodes::TCoAtomList& col1, const TVector<TString> .Done(); } -bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { - auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key, ui32 index) { +bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc) { + auto checkKey = [keySelector, &tableDesc] (const TExprBase& key, ui32 index) { if (!key.Maybe<TCoMember>()) { return false; } @@ -75,6 +75,42 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc return false; } + return true; + }; + + auto lambdaBody = keySelector.Body(); + if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) { + auto tuple = maybeTuple.Cast(); + for (size_t i = 0; i < tuple.Size(); ++i) { + if (!checkKey(tuple.Item(i), i)) { + return false; + } + } + } else { + if (!checkKey(lambdaBody, 0)) { + return false; + } + } + + return true; +} + +bool IsTableExistsKeySelector(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { + auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key) { + if (!key.Maybe<TCoMember>()) { + return false; + } + + auto member = key.Cast<TCoMember>(); + if (member.Struct().Raw() != keySelector.Args().Arg(0).Raw()) { + return false; + } + + auto column = member.Name().StringValue(); + if (!tableDesc.Metadata->Columns.contains(column)) { + return false; + } + if (columns) { columns->emplace_back(std::move(column)); } @@ -86,12 +122,12 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) { auto tuple = maybeTuple.Cast(); for (size_t i = 0; i < tuple.Size(); ++i) { - if (!checkKey(tuple.Item(i), i)) { + if (!checkKey(tuple.Item(i))) { return false; } } } else { - if (!checkKey(lambdaBody, 0)) { + if (!checkKey(lambdaBody)) { return false; } } @@ -99,8 +135,8 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc return true; } -bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) { - return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns); +bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& indexDesc, TVector<TString>* columns) { + return IsTableExistsKeySelector(node.KeySelectorLambda(), indexDesc, columns); } struct TReadMatch { @@ -267,7 +303,9 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx, structMembers.push_back(member); } - readIndexTable = Build<TCoMap>(ctx, read.Pos()) + // We need to save order for TopSort, otherwise TopSort will be replaced by Top during optimization (https://st.yandex-team.ru/YQL-15415) + readIndexTable = Build<TCoMapBase>(ctx, read.Pos()) + .CallableName(readIndexTable.Maybe<TCoTopSort>() ? TCoOrderedMap::CallableName() : TCoMap::CallableName()) .Input(readIndexTable) .Lambda() .Args({arg}) @@ -591,6 +629,8 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct return node; } + bool needSort = node.Maybe<TCoTopSort>() && !IsKeySelectorPkPrefix(topBase.KeySelectorLambda(), indexDesc); + auto filter = [&](const TExprBase& in) mutable { auto sortInput = in; @@ -603,7 +643,7 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct } auto newTop = Build<TCoTopBase>(ctx, node.Pos()) - .CallableName(node.Ref().Content()) + .CallableName(needSort ? TCoTopSort::CallableName() : TCoTop::CallableName()) .Input(sortInput) .KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref())) .SortDirections(topBase.SortDirections()) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp index 208067a29c..25544832fe 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp @@ -1763,6 +1763,54 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) { Y_UNIT_TEST(WriteIntoRenamingAsyncIndex) { CheckWriteIntoRenamingIndex(true); } + + Y_UNIT_TEST_TWIN(CheckPushTopSort, StreamLookup) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(StreamLookup); + + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateTableWithMultishardIndexAndDataColumn(kikimr.GetTestClient()); + + AssertSuccessResult(session.ExecuteDataQuery(Q1_(R"( + UPSERT INTO `/Root/MultiShardIndexedWithDataColumn` (key, fk, value) VALUES + (101u, 1001, "Value4"), + (102u, 1001, "Value3"), + (103u, 1001, "Value2"), + (99u, 1000, "Value1"); + )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync()); + + auto query = Q1_(R"( + SELECT key, fk, value, ext_value FROM MultiShardIndexedWithDataColumn VIEW index + WHERE key > 98 + ORDER BY fk, value + LIMIT 2; + )"); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + auto result = session.ExecuteDataQuery( + query, + TTxControl::BeginTx().CommitTx(), + execSettings) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + AssertTableStats(result, "/Root/MultiShardIndexedWithDataColumn", { + .ExpectedReads = 2 // without push down ExpectedReads = 4 + }); + AssertTableStats(result, "/Root/MultiShardIndexedWithDataColumn/index/indexImplTable", { + .ExpectedReads = 4 + }); + CompareYson(R"([ + [[99u];[1000u];["Value1"];#]; + [[103u];[1001u];["Value2"];#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } } } diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 5531bafd55..b4a4f34075 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -4152,6 +4152,151 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda .ExpectedReads = 1 }); } + Y_UNIT_TEST(IndexTopSortPushDown) { + TKikimrRunner kikimr; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + CreateSampleTablesWithIndex(session, false /*populateTables*/); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto result = session.ExecuteDataQuery(Q1_(R"( + REPLACE INTO `/Root/SecondaryWithDataColumns` (Key, Index2, Value) VALUES + ("0", "0", "Value2"), + ("1", "0", "Value1"), + ("2", "1", "Value0"), + ("3", "1", "Value0"), + ("4", "1", "Value0"); + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + { + // Use top without sort if ORDER BY is index key columns + result = session.ExecuteDataQuery(Q1_(R"( + SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2, Key + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 2 + }); + CompareYson(R"([ + [["0"];["0"];["Value2"];#]; + [["0"];["1"];["Value1"];#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + { + // Use top without sort if ORDER BY is prefix of index key columns + result = session.ExecuteDataQuery(Q1_(R"( + SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2 + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 2 + }); + } + { + // Use push down for top sort if ORDER BY exist column from idex data columns + result = session.ExecuteDataQuery(Q1_(R"( + SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2, Key, Value + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + { + // Use push down for top sort if ORDER BY exist column from idex data columns + result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2, Value + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + { + // Use push down for top sort if all columns in ORDER BY are in data columns + result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Value + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + { + // Use push down for top sort if columns in ORDER BY in wrong order + result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Key, Index2, Value + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + { + // Don't use push down for top sort if columns in ORDER BY in wrong order + result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2, Value, Key + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 2 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + { + // Don't use push down if ORDER BY exists column not from index + result = session.ExecuteDataQuery(Q1_(R"( + SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index + ORDER BY Index2, Key, Value, ExtPayload + LIMIT 2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + AssertTableStats(result, "/Root/SecondaryWithDataColumns", { + .ExpectedReads = 5 + }); + AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", { + .ExpectedReads = 5 + }); + } + } Y_UNIT_TEST(UpdateOnReadColumns) { { // Check that keys from non involved index are not in read columns |