diff options
author | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-07-07 15:20:39 +0300 |
---|---|---|
committer | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-07-07 15:20:39 +0300 |
commit | f099bc8c4dab3931f0fcd6ccb23177477c92118b (patch) | |
tree | b7f319cdb575f609c4d825e8cd834e08bdc15ed2 | |
parent | 0da40bdfddf6fdcaf09c2e88028611ccbf0a9494 (diff) | |
download | ydb-f099bc8c4dab3931f0fcd6ccb23177477c92118b.tar.gz |
KIKIMR-13295: support secondary indices for scan query
feature(kqp): support secondary indices for scan query
ref:8b4c2bd189da60ecf86bcaaa36274c525e4ae392
-rw-r--r-- | ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 14 | ||||
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_kql.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp | 71 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_rules.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/prepare/kqp_type_ann.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_scan_ut.cpp | 58 |
10 files changed, 149 insertions, 36 deletions
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 724c7d9b22..66d86ab9f1 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -136,9 +136,9 @@ "Match": {"Type": "Callable", "Name": "KqlLookupTable"} }, { - "Name": "TKqlLookupIndex", + "Name": "TKqlLookupIndexBase", "Base": "TKqlLookupTableBase", - "Match": {"Type": "Callable", "Name": "KqlLookupIndex"}, + "Match": {"Type": "CallableBase"}, "Children": [ {"Index": 3, "Name": "Index", "Type": "TCoAtom"} ] @@ -154,6 +154,16 @@ "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"} }, { + "Name": "TKqlLookupIndex", + "Base": "TKqlLookupIndexBase", + "Match": {"Type": "Callable", "Name": "KqlLookupIndex"} + }, + { + "Name": "TKqlStreamLookupIndex", + "Base": "TKqlLookupIndexBase", + "Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"} + }, + { "Name": "TKqlTableEffect", "Base": "TExprBase", "Match": {"Type": "CallableBase"}, diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index e36dbe7c7f..709ae12b00 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -532,12 +532,6 @@ TExprNode::TPtr HandleReadTable(const TKiReadTable& read, TExprContext& ctx, con return nullptr; } - if (kqpCtx->IsScanQuery()) { - const TString err = "Secondary index is not supported for ScanQuery"; - ctx.AddError(YqlIssue(ctx.GetPosition(read.Pos()), TIssuesIds::KIKIMR_BAD_REQUEST, err)); - return nullptr; - } - auto [metadata, state] = tableData.Metadata->GetIndexMetadata(indexName); YQL_ENSURE(metadata, "unable to find metadata for index: " << indexName); YQL_ENSURE(state == TIndexDescription::EIndexState::Ready diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index e696d5a50c..03a101d0e0 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -48,6 +48,7 @@ public: AddHandler(1, &TKqlReadTableIndex::Match, HNDL(RewriteIndexRead)); AddHandler(1, &TKqlLookupIndex::Match, HNDL(RewriteLookupIndex)); + AddHandler(1, &TKqlStreamLookupIndex::Match, HNDL(RewriteStreamLookupIndex)); AddHandler(2, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>)); AddHandler(2, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>)); @@ -144,6 +145,12 @@ protected: return output; } + TMaybeNode<TExprBase> RewriteStreamLookupIndex(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteStreamLookupIndex(node, ctx, KqpCtx); + DumpAppliedRule("RewriteStreamLookupIndex", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) { TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx); DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index 24d63381c7..1234ee976e 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -168,10 +168,11 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx, return node; } - if (auto maybeIndexLookup = lookup.Maybe<TKqlLookupIndex>()) { + if (auto maybeIndexLookup = lookup.Maybe<TKqlLookupIndexBase>()) { auto indexLookup = maybeIndexLookup.Cast(); - return Build<TKqlLookupIndex>(ctx, lookup.Pos()) + return Build<TKqlLookupIndexBase>(ctx, lookup.Pos()) + .CallableName(indexLookup.CallableName()) .Table(indexLookup.Table()) .LookupKeys(indexLookup.LookupKeys()) .Columns(usedColumns.Cast()) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index 955abe4e98..10e142a848 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -109,17 +109,40 @@ TExprBase DoRewriteIndexRead(const TKqlReadTableIndex& read, TExprContext& ctx, } // namespace TExprBase KqpRewriteIndexRead(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - if (!kqpCtx.IsDataQuery()) { - return node; - } - if (auto maybeIndexRead = node.Maybe<TKqlReadTableIndex>()) { auto indexRead = maybeIndexRead.Cast(); const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, indexRead.Table().Path()); const auto& [indexMeta, _ ] = tableDesc.Metadata->GetIndexMetadata(TString(indexRead.Index().Value())); - return DoRewriteIndexRead(indexRead, ctx, tableDesc, indexMeta, {}); + if (kqpCtx.IsDataQuery()) { + return DoRewriteIndexRead(indexRead, ctx, tableDesc, indexMeta, {}); + } + + const bool needDataRead = CheckIndexCovering(indexRead, indexMeta); + if (!needDataRead) { + return Build<TKqlReadTable>(ctx, indexRead.Pos()) + .Table(BuildTableMeta(*indexMeta, indexRead.Pos(), ctx)) + .Range(indexRead.Range()) + .Columns(indexRead.Columns()) + .Settings(indexRead.Settings()) + .Done(); + } + + auto keyColumnsList = BuildKeyColumnsList(tableDesc, indexRead.Pos(), ctx); + + TExprBase readIndexTable = Build<TKqlReadTable>(ctx, indexRead.Pos()) + .Table(BuildTableMeta(*indexMeta, indexRead.Pos(), ctx)) + .Range(indexRead.Range()) + .Columns(keyColumnsList) + .Settings(indexRead.Settings()) + .Done(); + + return Build<TKqlStreamLookupTable>(ctx, indexRead.Pos()) + .Table(indexRead.Table()) + .LookupKeys(readIndexTable.Ptr()) + .Columns(indexRead.Columns()) + .Done(); } return node; @@ -164,6 +187,44 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const return node; } +TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + if (!kqpCtx.IsScanQuery()) { + return node; + } + + if (auto maybeStreamLookupIndex = node.Maybe<TKqlStreamLookupIndex>()) { + auto streamLookupIndex = maybeStreamLookupIndex.Cast(); + + const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path()); + const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue()); + + const bool needDataRead = CheckIndexCovering(streamLookupIndex, indexMeta); + if (!needDataRead) { + return Build<TKqlStreamLookupTable>(ctx, node.Pos()) + .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) + .LookupKeys(streamLookupIndex.LookupKeys()) + .Columns(streamLookupIndex.Columns()) + .Done(); + } + + auto keyColumnsList = BuildKeyColumnsList(tableDesc, streamLookupIndex.Pos(), ctx); + + TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos()) + .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) + .LookupKeys(streamLookupIndex.LookupKeys()) + .Columns(keyColumnsList) + .Done(); + + return Build<TKqlStreamLookupTable>(ctx, node.Pos()) + .Table(streamLookupIndex.Table()) + .LookupKeys(lookupIndexTable.Ptr()) + .Columns(streamLookupIndex.Columns()) + .Done(); + } + + return node; +} + // The index and main table have same number of rows, so we can push a copy of TCoTopSort or TCoTake // through TKqlLookupTable. // The simplest way is to match TopSort or Take over TKqlReadTableIndex. diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index c0be2b4dc5..32bdb23e3c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -75,7 +75,19 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T const TKqpOptimizeContext& kqpCtx) { if (kqpCtx.IsScanQuery()) { - YQL_ENSURE(false, "StreamLookupIndex is not implemented"); + YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled"); + return Build<TKqlStreamLookupIndex>(ctx, pos) + .Table(read.Table()) + .LookupKeys<TCoSkipNullMembers>() + .Input(keysToLookup) + .Members() + .Add(lookupNames) + .Build() + .Build() + .Columns(read.Columns()) + .Index() + .Build(indexName) + .Done(); } return Build<TKqlLookupIndex>(ctx, pos) diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h index e61e3cc4af..db474ad71d 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h @@ -38,6 +38,9 @@ NYql::NNodes::TExprBase KqpRewriteIndexRead(const NYql::NNodes::TExprBase& node, NYql::NNodes::TExprBase KqpRewriteLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRewriteStreamLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpRewriteTopSortOverIndexRead(const NYql::NNodes::TExprBase& node, NYql::TExprContext&, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp index 37b3bd99bb..965efb09c0 100644 --- a/ydb/core/kqp/prepare/kqp_type_ann.cpp +++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp @@ -320,7 +320,7 @@ TStatus AnnotateReadTableRanges(const TExprNode::TPtr& node, TExprContext& ctx, TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) { - if (!EnsureArgsCount(*node, TKqlLookupIndex::Match(node.Get()) ? 4 : 3, ctx)) { + if (!EnsureArgsCount(*node, TKqlLookupIndexBase::Match(node.Get()) ? 4 : 3, ctx)) { return TStatus::Error; } @@ -368,8 +368,8 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons auto structType = lookupType->Cast<TStructExprType>(); ui32 keyColumnsCount = 0; - if (TKqlLookupIndex::Match(node.Get())) { - auto index = node->Child(TKqlLookupIndex::idx_Index); + if (TKqlLookupIndexBase::Match(node.Get())) { + auto index = node->Child(TKqlLookupIndexBase::idx_Index); if (!EnsureAtom(*index, ctx)) { return TStatus::Error; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 4777fd4d71..0ff947a52f 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -488,6 +488,7 @@ private: entry.TableId = TableId; entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; + entry.ShowPrivatePath = true; request->ResultSet.emplace_back(entry); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index cc9af0f917..35d79981cf 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1680,7 +1680,11 @@ Y_UNIT_TEST_SUITE(KqpScan) { } Y_UNIT_TEST_TWIN(SecondaryIndex, UseSessionActor) { - auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor, {}, AppCfg()); + auto settings = TKikimrSettings() + .SetEnableKqpSessionActor(UseSessionActor) + .SetEnableKqpScanQueryStreamLookup(true); + + TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1698,24 +1702,44 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_C(itOk.IsSuccess(), itOk.GetIssues().ToString()); CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itOk)); - // should NOT work with explicit view - auto itIndex = db.StreamExecuteScanQuery(R"( - PRAGMA AnsiInForEmptyOrNullableItemsCollections; - SELECT Value - FROM `/Root/SecondaryComplexKeys` VIEW Index - WHERE (Fk1, Fk2) IN AsList((1, "Fk1"), (2, "Fk2"), (42, "Fk5"), (Null, "FkNull")) - ORDER BY Value; - )").GetValueSync(); + { // Index contains required columns + auto itIndex = db.StreamExecuteScanQuery(R"( + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + SELECT Fk2 + FROM `/Root/SecondaryComplexKeys` VIEW Index + WHERE Fk1 IN [1, 2, 3, 4, 5] + ORDER BY Fk2; + )").GetValueSync(); - NYdb::EStatus status = NYdb::EStatus::STATUS_UNDEFINED; - for (;;) { - auto streamPart = itIndex.ReadNext().GetValueSync(); - if (!streamPart.IsSuccess()) { - status = streamPart.GetStatus(); - break; - } + UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString()); + CompareYson(R"([[["Fk1"]];[["Fk2"]];[["Fk5"]]])", StreamResultToYson(itIndex)); + } + + { + auto itIndex = db.StreamExecuteScanQuery(R"( + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + SELECT Value + FROM `/Root/SecondaryComplexKeys` VIEW Index + WHERE Fk1 >= 1 AND Fk1 < 5 + ORDER BY Value; + )").GetValueSync(); + + UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString()); + CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itIndex)); + } + + { + auto itIndex = db.StreamExecuteScanQuery(R"( + PRAGMA AnsiInForEmptyOrNullableItemsCollections; + SELECT Value + FROM `/Root/SecondaryComplexKeys` VIEW Index + WHERE (Fk1, Fk2) IN AsList((1, "Fk1"), (2, "Fk2"), (42, "Fk5"), (Null, "FkNull")) + ORDER BY Value; + )").GetValueSync(); + + UNIT_ASSERT_C(itIndex.IsSuccess(), itIndex.GetIssues().ToString()); + CompareYson(R"([[["Payload1"]];[["Payload2"]]])", StreamResultToYson(itIndex)); } - UNIT_ASSERT_VALUES_EQUAL_C(status, EStatus::BAD_REQUEST, "ScanQuery with explicit index should fail"); } Y_UNIT_TEST_TWIN(BoolFlag, UseSessionActor) { |