aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvpolka <vpolka@yandex-team.com>2023-10-13 12:45:15 +0300
committervpolka <vpolka@yandex-team.com>2023-10-13 13:06:39 +0300
commitecebb418cadc3c4481096b9faaa7d7775a96c572 (patch)
tree31bb9aa0ead55cb167fc4c9bf4a990232775251b
parent24eb86b163be9a818f095e0d99a4887fd8f6eec0 (diff)
downloadydb-ecebb418cadc3c4481096b9faaa7d7775a96c572.tar.gz
KIKIMR-15250: can push down top sort if not pk prefix columns in input exist in index data columns
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp56
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp48
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp145
3 files changed, 241 insertions, 8 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
index 6ba7a8e7d8..acb6b955e9 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
@@ -58,8 +58,8 @@ TCoAtomList MergeColumns(const NNodes::TCoAtomList& col1, const TVector<TString>
.Done();
}
-bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) {
- auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key, ui32 index) {
+bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc) {
+ auto checkKey = [keySelector, &tableDesc] (const TExprBase& key, ui32 index) {
if (!key.Maybe<TCoMember>()) {
return false;
}
@@ -75,6 +75,42 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc
return false;
}
+ return true;
+ };
+
+ auto lambdaBody = keySelector.Body();
+ if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) {
+ auto tuple = maybeTuple.Cast();
+ for (size_t i = 0; i < tuple.Size(); ++i) {
+ if (!checkKey(tuple.Item(i), i)) {
+ return false;
+ }
+ }
+ } else {
+ if (!checkKey(lambdaBody, 0)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool IsTableExistsKeySelector(NNodes::TCoLambda keySelector, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) {
+ auto checkKey = [keySelector, &tableDesc, columns] (const TExprBase& key) {
+ if (!key.Maybe<TCoMember>()) {
+ return false;
+ }
+
+ auto member = key.Cast<TCoMember>();
+ if (member.Struct().Raw() != keySelector.Args().Arg(0).Raw()) {
+ return false;
+ }
+
+ auto column = member.Name().StringValue();
+ if (!tableDesc.Metadata->Columns.contains(column)) {
+ return false;
+ }
+
if (columns) {
columns->emplace_back(std::move(column));
}
@@ -86,12 +122,12 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc
if (auto maybeTuple = lambdaBody.Maybe<TExprList>()) {
auto tuple = maybeTuple.Cast();
for (size_t i = 0; i < tuple.Size(); ++i) {
- if (!checkKey(tuple.Item(i), i)) {
+ if (!checkKey(tuple.Item(i))) {
return false;
}
}
} else {
- if (!checkKey(lambdaBody, 0)) {
+ if (!checkKey(lambdaBody)) {
return false;
}
}
@@ -99,8 +135,8 @@ bool IsKeySelectorPkPrefix(NNodes::TCoLambda keySelector, const TKikimrTableDesc
return true;
}
-bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& tableDesc, TVector<TString>* columns) {
- return IsKeySelectorPkPrefix(node.KeySelectorLambda(), tableDesc, columns);
+bool CanPushTopSort(const TCoTopBase& node, const TKikimrTableDescription& indexDesc, TVector<TString>* columns) {
+ return IsTableExistsKeySelector(node.KeySelectorLambda(), indexDesc, columns);
}
struct TReadMatch {
@@ -267,7 +303,9 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx,
structMembers.push_back(member);
}
- readIndexTable = Build<TCoMap>(ctx, read.Pos())
+ // We need to save order for TopSort, otherwise TopSort will be replaced by Top during optimization (https://st.yandex-team.ru/YQL-15415)
+ readIndexTable = Build<TCoMapBase>(ctx, read.Pos())
+ .CallableName(readIndexTable.Maybe<TCoTopSort>() ? TCoOrderedMap::CallableName() : TCoMap::CallableName())
.Input(readIndexTable)
.Lambda()
.Args({arg})
@@ -591,6 +629,8 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct
return node;
}
+ bool needSort = node.Maybe<TCoTopSort>() && !IsKeySelectorPkPrefix(topBase.KeySelectorLambda(), indexDesc);
+
auto filter = [&](const TExprBase& in) mutable {
auto sortInput = in;
@@ -603,7 +643,7 @@ TExprBase KqpRewriteTopSortOverIndexRead(const TExprBase& node, TExprContext& ct
}
auto newTop = Build<TCoTopBase>(ctx, node.Pos())
- .CallableName(node.Ref().Content())
+ .CallableName(needSort ? TCoTopSort::CallableName() : TCoTop::CallableName())
.Input(sortInput)
.KeySelectorLambda(ctx.DeepCopyLambda(topBase.KeySelectorLambda().Ref()))
.SortDirections(topBase.SortDirections())
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
index 208067a29c..25544832fe 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_multishard_ut.cpp
@@ -1763,6 +1763,54 @@ Y_UNIT_TEST_SUITE(KqpMultishardIndex) {
Y_UNIT_TEST(WriteIntoRenamingAsyncIndex) {
CheckWriteIntoRenamingIndex(true);
}
+
+ Y_UNIT_TEST_TWIN(CheckPushTopSort, StreamLookup) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(StreamLookup);
+
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateTableWithMultishardIndexAndDataColumn(kikimr.GetTestClient());
+
+ AssertSuccessResult(session.ExecuteDataQuery(Q1_(R"(
+ UPSERT INTO `/Root/MultiShardIndexedWithDataColumn` (key, fk, value) VALUES
+ (101u, 1001, "Value4"),
+ (102u, 1001, "Value3"),
+ (103u, 1001, "Value2"),
+ (99u, 1000, "Value1");
+ )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync());
+
+ auto query = Q1_(R"(
+ SELECT key, fk, value, ext_value FROM MultiShardIndexedWithDataColumn VIEW index
+ WHERE key > 98
+ ORDER BY fk, value
+ LIMIT 2;
+ )");
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+ auto result = session.ExecuteDataQuery(
+ query,
+ TTxControl::BeginTx().CommitTx(),
+ execSettings)
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ AssertTableStats(result, "/Root/MultiShardIndexedWithDataColumn", {
+ .ExpectedReads = 2 // without push down ExpectedReads = 4
+ });
+ AssertTableStats(result, "/Root/MultiShardIndexedWithDataColumn/index/indexImplTable", {
+ .ExpectedReads = 4
+ });
+ CompareYson(R"([
+ [[99u];[1000u];["Value1"];#];
+ [[103u];[1001u];["Value2"];#]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
}
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 5531bafd55..b4a4f34075 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -4152,6 +4152,151 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
.ExpectedReads = 1
});
}
+ Y_UNIT_TEST(IndexTopSortPushDown) {
+ TKikimrRunner kikimr;
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ CreateSampleTablesWithIndex(session, false /*populateTables*/);
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ REPLACE INTO `/Root/SecondaryWithDataColumns` (Key, Index2, Value) VALUES
+ ("0", "0", "Value2"),
+ ("1", "0", "Value1"),
+ ("2", "1", "Value0"),
+ ("3", "1", "Value0"),
+ ("4", "1", "Value0");
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ {
+ // Use top without sort if ORDER BY is index key columns
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2, Key
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 2
+ });
+ CompareYson(R"([
+ [["0"];["0"];["Value2"];#];
+ [["0"];["1"];["Value1"];#]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ {
+ // Use top without sort if ORDER BY is prefix of index key columns
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 2
+ });
+ }
+ {
+ // Use push down for top sort if ORDER BY exist column from idex data columns
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT Index2, Key, Value, ExtPayload FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2, Key, Value
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ {
+ // Use push down for top sort if ORDER BY exist column from idex data columns
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2, Value
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ {
+ // Use push down for top sort if all columns in ORDER BY are in data columns
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Value
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ {
+ // Use push down for top sort if columns in ORDER BY in wrong order
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Key, Index2, Value
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ {
+ // Don't use push down for top sort if columns in ORDER BY in wrong order
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2, Value, Key
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 2
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ {
+ // Don't use push down if ORDER BY exists column not from index
+ result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT * FROM `/Root/SecondaryWithDataColumns` VIEW Index
+ ORDER BY Index2, Key, Value, ExtPayload
+ LIMIT 2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns", {
+ .ExpectedReads = 5
+ });
+ AssertTableStats(result, "/Root/SecondaryWithDataColumns/Index/indexImplTable", {
+ .ExpectedReads = 5
+ });
+ }
+ }
Y_UNIT_TEST(UpdateOnReadColumns) {
{
// Check that keys from non involved index are not in read columns