aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <ulya.sidorina@gmail.com>2022-07-07 15:20:39 +0300
committerIuliia Sidorina <ulya.sidorina@gmail.com>2022-07-07 15:20:39 +0300
commitf099bc8c4dab3931f0fcd6ccb23177477c92118b (patch)
treeb7f319cdb575f609c4d825e8cd834e08bdc15ed2
parent0da40bdfddf6fdcaf09c2e88028611ccbf0a9494 (diff)
downloadydb-f099bc8c4dab3931f0fcd6ccb23177477c92118b.tar.gz
KIKIMR-13295: support secondary indices for scan query
feature(kqp): support secondary indices for scan query ref:8b4c2bd189da60ecf86bcaaa36274c525e4ae392
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json14
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp6
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp7
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp5
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp71
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp14
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_rules.h3
-rw-r--r--ydb/core/kqp/prepare/kqp_type_ann.cpp6
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp1
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp58
10 files changed, 149 insertions, 36 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 724c7d9b22..66d86ab9f1 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -136,9 +136,9 @@
"Match": {"Type": "Callable", "Name": "KqlLookupTable"}
},
{
- "Name": "TKqlLookupIndex",
+ "Name": "TKqlLookupIndexBase",
"Base": "TKqlLookupTableBase",
- "Match": {"Type": "Callable", "Name": "KqlLookupIndex"},
+ "Match": {"Type": "CallableBase"},
"Children": [
{"Index": 3, "Name": "Index", "Type": "TCoAtom"}
]
@@ -154,6 +154,16 @@
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}
},
{
+ "Name": "TKqlLookupIndex",
+ "Base": "TKqlLookupIndexBase",
+ "Match": {"Type": "Callable", "Name": "KqlLookupIndex"}
+ },
+ {
+ "Name": "TKqlStreamLookupIndex",
+ "Base": "TKqlLookupIndexBase",
+ "Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}
+ },
+ {
"Name": "TKqlTableEffect",
"Base": "TExprBase",
"Match": {"Type": "CallableBase"},
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index e36dbe7c7f..709ae12b00 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -532,12 +532,6 @@ TExprNode::TPtr HandleReadTable(const TKiReadTable& read, TExprContext& ctx, con
return nullptr;
}
- if (kqpCtx->IsScanQuery()) {
- const TString err = "Secondary index is not supported for ScanQuery";
- ctx.AddError(YqlIssue(ctx.GetPosition(read.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err));
- return nullptr;
- }
-
auto [metadata, state] = tableData.Metadata->GetIndexMetadata(indexName);
YQL_ENSURE(metadata, "unable to find metadata for index: " << indexName);
YQL_ENSURE(state == TIndexDescription::EIndexState::Ready
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index e696d5a50c..03a101d0e0 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -48,6 +48,7 @@ public:
AddHandler(1, &TKqlReadTableIndex::Match, HNDL(RewriteIndexRead));
AddHandler(1, &TKqlLookupIndex::Match, HNDL(RewriteLookupIndex));
+ AddHandler(1, &TKqlStreamLookupIndex::Match, HNDL(RewriteStreamLookupIndex));
AddHandler(2, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>));
AddHandler(2, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>));
@@ -144,6 +145,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> RewriteStreamLookupIndex(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpRewriteStreamLookupIndex(node, ctx, KqpCtx);
+ DumpAppliedRule("RewriteStreamLookupIndex", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx);
DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
index 24d63381c7..1234ee976e 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
@@ -168,10 +168,11 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx,
return node;
}
- if (auto maybeIndexLookup = lookup.Maybe<TKqlLookupIndex>()) {
+ if (auto maybeIndexLookup = lookup.Maybe<TKqlLookupIndexBase>()) {
auto indexLookup = maybeIndexLookup.Cast();
- return Build<TKqlLookupIndex>(ctx, lookup.Pos())
+ return Build<TKqlLookupIndexBase>(ctx, lookup.Pos())
+ .CallableName(indexLookup.CallableName())
.Table(indexLookup.Table())
.LookupKeys(indexLookup.LookupKeys())
.Columns(usedColumns.Cast())
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
index 955abe4e98..10e142a848 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
@@ -109,17 +109,40 @@ TExprBase DoRewriteIndexRead(const TKqlReadTableIndex& read, TExprContext& ctx,
} // namespace
TExprBase KqpRewriteIndexRead(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
- if (!kqpCtx.IsDataQuery()) {
- return node;
- }
-
if (auto maybeIndexRead = node.Maybe<TKqlReadTableIndex>()) {
auto indexRead = maybeIndexRead.Cast();
const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, indexRead.Table().Path());
const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(TString(indexRead.Index().Value()));
- return DoRewriteIndexRead(indexRead, ctx, tableDesc, indexMeta, {});
+ if (kqpCtx.IsDataQuery()) {
+ return DoRewriteIndexRead(indexRead, ctx, tableDesc, indexMeta, {});
+ }
+
+ const bool needDataRead = CheckIndexCovering(indexRead, indexMeta);
+ if (!needDataRead) {
+ return Build<TKqlReadTable>(ctx, indexRead.Pos())
+ .Table(BuildTableMeta(*indexMeta, indexRead.Pos(), ctx))
+ .Range(indexRead.Range())
+ .Columns(indexRead.Columns())
+ .Settings(indexRead.Settings())
+ .Done();
+ }
+
+ auto keyColumnsList = BuildKeyColumnsList(tableDesc, indexRead.Pos(), ctx);
+
+ TExprBase readIndexTable = Build<TKqlReadTable>(ctx, indexRead.Pos())
+ .Table(BuildTableMeta(*indexMeta, indexRead.Pos(), ctx))
+ .Range(indexRead.Range())
+ .Columns(keyColumnsList)
+ .Settings(indexRead.Settings())
+ .Done();
+
+ return Build<TKqlStreamLookupTable>(ctx, indexRead.Pos())
+ .Table(indexRead.Table())
+ .LookupKeys(readIndexTable.Ptr())
+ .Columns(indexRead.Columns())
+ .Done();
}
return node;
@@ -164,6 +187,44 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
return node;
}
+TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ if (!kqpCtx.IsScanQuery()) {
+ return node;
+ }
+
+ if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) {
+ auto streamLookupIndex = maybeStreamLookupIndex.Cast();
+
+ const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
+ const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
+
+ const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta);
+ if (!needDataRead) {
+ return Build<TKqlStreamLookupTable>(ctx, node.Pos())
+ .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
+ .LookupKeys(streamLookupIndex.LookupKeys())
+ .Columns(streamLookupIndex.Columns())
+ .Done();
+ }
+
+ auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx);
+
+ TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
+ .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
+ .LookupKeys(streamLookupIndex.LookupKeys())
+ .Columns(keyColumnsList)
+ .Done();
+
+ return Build<TKqlStreamLookupTable>(ctx, node.Pos())
+ .Table(streamLookupIndex.Table())
+ .LookupKeys(lookupIndexTable.Ptr())
+ .Columns(streamLookupIndex.Columns())
+ .Done();
+ }
+
+ return node;
+}
+
// The index and main table have same number of rows, so we can push a copy of TCoTopSort or TCoTake
// through TKqlLookupTable.
// The simplest way is to match TopSort or Take over TKqlReadTableIndex.
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index c0be2b4dc5..32bdb23e3c 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -75,7 +75,19 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T
const TKqpOptimizeContext& kqpCtx)
{
if (kqpCtx.IsScanQuery()) {
- YQL_ENSURE(false, "StreamLookupIndex is not implemented");
+ YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled");
+ return Build<TKqlStreamLookupIndex>(ctx, pos)
+ .Table(read.Table())
+ .LookupKeys<TCoSkipNullMembers>()
+ .Input(keysToLookup)
+ .Members()
+ .Add(lookupNames)
+ .Build()
+ .Build()
+ .Columns(read.Columns())
+ .Index()
+ .Build(indexName)
+ .Done();
}
return Build<TKqlLookupIndex>(ctx, pos)
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
index e61e3cc4af..db474ad71d 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
@@ -38,6 +38,9 @@ NYql::NNodes::TExprBase KqpRewriteIndexRead(const NYql::NNodes::TExprBase& node,
NYql::NNodes::TExprBase KqpRewriteLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase KqpRewriteStreamLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpRewriteTopSortOverIndexRead(const NYql::NNodes::TExprBase& node, NYql::TExprContext&,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp
index 37b3bd99bb..965efb09c0 100644
--- a/ydb/core/kqp/prepare/kqp_type_ann.cpp
+++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp
@@ -320,7 +320,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx,
TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns)
{
- if (!EnsureArgsCount(*node, TKqlLookupIndex::Match(node.Get()) ? 4 : 3, ctx)) {
+ if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) {
return TStatus::Error;
}
@@ -368,8 +368,8 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons
auto structType = lookupType->Cast<TStructExprType>();
ui32 keyColumnsCount = 0;
- if (TKqlLookupIndex::Match(node.Get())) {
- auto index = node->Child(TKqlLookupIndex::idx_Index);
+ if (TKqlLookupIndexBase::Match(node.Get())) {
+ auto index = node->Child(TKqlLookupIndexBase::idx_Index);
if (!EnsureAtom(*index, ctx)) {
return TStatus::Error;
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 4777fd4d71..0ff947a52f 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -488,6 +488,7 @@ private:
entry.TableId = TableId;
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
+ entry.ShowPrivatePath = true;
request->ResultSet.emplace_back(entry);
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index cc9af0f917..35d79981cf 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1680,7 +1680,11 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
Y_UNIT_TEST_TWIN(SecondaryIndex, UseSessionActor) {
- auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg());
+ auto settings = TKikimrSettings()
+ .SetEnableKqpSessionActor(UseSessionActor)
+ .SetEnableKqpScanQueryStreamLookup(true);
+
+ TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1698,24 +1702,44 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_C(itOk.IsSuccess(), itOk.GetIssues().ToString());
CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itOk));
- // should NOT work with explicit view
- auto itIndex = db.StreamExecuteScanQuery(R"(
- PRAGMA AnsiInForEmptyOrNullableItemsCollections;
- SELECT Value
- FROM `/Root/SecondaryComplexKeys` VIEW Index
- WHERE (Fk1, Fk2) IN AsList((1, "Fk1"), (2, "Fk2"), (42, "Fk5"), (Null, "FkNull"))
- ORDER BY Value;
- )").GetValueSync();
+ { // Index contains required columns
+ auto itIndex = db.StreamExecuteScanQuery(R"(
+ PRAGMA AnsiInForEmptyOrNullableItemsCollections;
+ SELECT Fk2
+ FROM `/Root/SecondaryComplexKeys` VIEW Index
+ WHERE Fk1 IN [1, 2, 3, 4, 5]
+ ORDER BY Fk2;
+ )").GetValueSync();
- NYdb::EStatus status = NYdb::EStatus::STATUS_UNDEFINED;
- for (;;) {
- auto streamPart = itIndex.ReadNext().GetValueSync();
- if (!streamPart.IsSuccess()) {
- status = streamPart.GetStatus();
- break;
- }
+ UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString());
+ CompareYson(R"([[["Fk1"]];[["Fk2"]];[["Fk5"]]])", StreamResultToYson(itIndex));
+ }
+
+ {
+ auto itIndex = db.StreamExecuteScanQuery(R"(
+ PRAGMA AnsiInForEmptyOrNullableItemsCollections;
+ SELECT Value
+ FROM `/Root/SecondaryComplexKeys` VIEW Index
+ WHERE Fk1 >= 1 AND Fk1 < 5
+ ORDER BY Value;
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString());
+ CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itIndex));
+ }
+
+ {
+ auto itIndex = db.StreamExecuteScanQuery(R"(
+ PRAGMA AnsiInForEmptyOrNullableItemsCollections;
+ SELECT Value
+ FROM `/Root/SecondaryComplexKeys` VIEW Index
+ WHERE (Fk1, Fk2) IN AsList((1, "Fk1"), (2, "Fk2"), (42, "Fk5"), (Null, "FkNull"))
+ ORDER BY Value;
+ )").GetValueSync();
+
+ UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString());
+ CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itIndex));
}
- UNIT_ASSERT_VALUES_EQUAL_C(status, EStatus::BAD_REQUEST, "ScanQuery with explicit index should fail");
}
Y_UNIT_TEST_TWIN(BoolFlag, UseSessionActor) {