diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-12-22 12:24:11 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-12-22 12:24:11 +0300 |
commit | dcc1ffc26fabcd374f52f45b980499d00fe5a00c (patch) | |
tree | 4e17a879bf7cef6e97ac51cca88e7ab54447e08f | |
parent | ed846a1194eb88e712a28e36b2f6fbbfbe801d17 (diff) | |
download | ydb-dcc1ffc26fabcd374f52f45b980499d00fe5a00c.tar.gz |
lookup points using StreamLookup for scans
fix(kqp): lookup points using StreamLookup for scans
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp | 27 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 59 |
2 files changed, 81 insertions, 5 deletions
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index ec9857f580..84739f9e05 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -219,7 +219,8 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T fetches.reserve(lookup.GetKeyRanges().size()); for (auto& keyRange : lookup.GetKeyRanges()) { - bool useLookup = false; + bool useDataQueryLookup = false; + bool useScanQueryLookup = false; if (onlyPointRanges && !IsPointPrefix(keyRange)) { return node; } @@ -229,11 +230,13 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T // NOTE: Use more efficient full key lookup implementation in datashard. // Consider using lookup for partial keys as well once better constant folding // is available, currently it can introduce redundant compute stage. - useLookup = kqpCtx.IsDataQuery() && isFullKey; + useDataQueryLookup = kqpCtx.IsDataQuery() && isFullKey; + useScanQueryLookup = kqpCtx.IsScanQuery() && isFullKey + && kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(); } TMaybeNode<TExprBase> readInput; - if (useLookup) { + if (useDataQueryLookup) { auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx); if (indexName) { @@ -258,6 +261,24 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Done(); } } + } else if (useScanQueryLookup) { + YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()); + auto lookupKeys = BuildEquiRangeLookup(keyRange, tableDesc, read.Pos(), ctx); + + if (indexName) { + readInput = Build<TKqlStreamLookupIndex>(ctx, read.Pos()) + .Table(read.Table()) + .LookupKeys(lookupKeys) + .Columns(read.Columns()) + .Index(indexName.Cast()) + .Done(); + } else { + readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos()) + .Table(read.Table()) + .LookupKeys(lookupKeys) + .Columns(read.Columns()) + .Done(); + } } else { auto keyRangeExpr = BuildKeyRangeExpr(keyRange, tableDesc, node.Pos(), ctx); diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 661b216052..a1429249a1 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -1158,7 +1158,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT(res.QueryStats); UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); + // TODO: KIKIMR-16691 (add stats for sream lookup) + //UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); } // complex key @@ -1180,7 +1181,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT(res.QueryStats); UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); + // TODO: KIKIMR-16691 (add stats for sream lookup) + //UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).affected_shards(), 1); } } @@ -2122,6 +2124,59 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } + Y_UNIT_TEST(StreamLookupByFullPk) { + TKikimrRunner kikimr; + auto db = kikimr.GetTableClient(); + CreateSampleTables(kikimr); + + { + kikimr.GetTestClient().CreateTable("/Root", R"( + Name: "TestTable" + Columns { Name: "Key1", Type: "Uint64" } + Columns { Name: "Key2", Type: "Uint64" } + Columns { Name: "Value", Type: "String" } + KeyColumnNames: ["Key1", "Key2"] + SplitBoundary { + KeyPrefix { + Tuple { Optional { Uint64: 2 } } + Tuple { Optional { Uint64: 20 } } + } + } + )"); + + auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"( + REPLACE INTO `/Root/TestTable` (Key1, Key2, Value) VALUES + (1u, 10, "Value1"), + (2u, 19, "Value2"), + (2u, 21, "Value2"), + (3u, 30, "Value3"), + (4u, 40, "Value4"), + (5u, 50, "Value5"); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + TStreamExecScanQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Full); + + auto it = db.StreamExecuteScanQuery(R"( + PRAGMA kikimr.OptEnablePredicateExtract = "false"; + SELECT * FROM `/Root/TestTable` WHERE Key1 = 1 AND Key2 = 10; + )", settings).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + auto result = CollectStreamResult(it); + CompareYson(R"([[[1u];[10u];["Value1"]]])", result.ResultSetYson); + UNIT_ASSERT(result.QueryStats); + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.QueryStats->query_plan(), &plan, true); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT(streamLookup.IsDefined()); + } + } + Y_UNIT_TEST(StreamLookupTryGetDataBeforeSchemeInitialization) { TPortManager tp; |