diff options
author | ssmike <ssmike@ydb.tech> | 2023-08-03 14:09:04 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-08-03 14:09:04 +0300 |
commit | 3d60ef65e18a3b25c2d163969485c15b09bac950 (patch) | |
tree | 41bf1dd839e110d05467b1c0312bb54917be4928 | |
parent | c6a76f7650e817b0054ab46844c982d8a9903b8b (diff) | |
download | ydb-3d60ef65e18a3b25c2d163969485c15b09bac950.tar.gz |
Fix iterator reads
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp | 36 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_sort_ut.cpp | 38 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 19 |
8 files changed, 128 insertions, 49 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 8731b9ad18e..275a1c68726 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -335,23 +335,23 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet auto newShard = TShardState(partition.ShardId, ++ScansCounter); for (ui64 j = i; j < state.Ranges.size(); ++j) { - CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr) - << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); - - auto intersection = Intersect(KeyColumnTypes, partitionRange, state.Ranges[j].ToTableRange()); - - if (!intersection.IsEmptyRange(KeyColumnTypes)) { + auto comparison = CompareRanges(partitionRange, state.Ranges[j].ToTableRange(), KeyColumnTypes); + CA_LOG_D("Compare range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr) + << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) + << " : " << comparison); + + if (comparison > 0) { + continue; + } else if (comparison == 0) { + auto intersection = Intersect(KeyColumnTypes, partitionRange, state.Ranges[j].ToTableRange()); CA_LOG_D("Add range to new shardId: " << partition.ShardId << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); newShard.Ranges.emplace_back(TSerializedTableRange(intersection)); } else { - CA_LOG_D("empty intersection"); - if (j > i) { - i = j - 1; - } break; } + i = j; } if (!newShard.Ranges.empty()) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index cfdf9c94cc2..80756a76a62 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -846,6 +846,12 @@ protected: if (source.GetSequentialInFlightShards()) { auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + if (Stats) { + THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv()); + for (auto& [shardId, _] : partitions) { + Stats->AffectedShards.insert(shardId); + } + } if (shardInfo.KeyReadRanges) { addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards()); return Nothing(); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp index 1e9b774c0a9..56137612996 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp @@ -81,6 +81,7 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim auto settings = TKqpReadTableSettings::Parse(matched->Settings); auto selectColumns = matched->Columns; TVector<TCoAtom> skipNullColumns; + TExprNode::TPtr limit; if (settings.SkipNullKeys) { THashSet<TString> seenColumns; TVector<TCoAtom> columns; @@ -99,10 +100,13 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim columns.push_back(atom); } } - + matched->Columns = Build<TCoAtomList>(ctx, matched->Columns.Pos()).Add(columns).Done(); settings.SkipNullKeys.clear(); + limit = settings.ItemsLimit; + settings.ItemsLimit = nullptr; + matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); } @@ -125,24 +129,36 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim TCoArgument arg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_source_arg")}; args.insert(args.begin(), arg); + + TExprNode::TPtr replaceExpr = + Build<TCoToFlow>(ctx, matched->Expr.Pos()) + .Input(arg) + .Done() + .Ptr(); + if (skipNullColumns) { - argReplaces[matched->Expr.Raw()] = + replaceExpr = Build<TCoExtractMembers>(ctx, node.Pos()) .Members(selectColumns) .Input<TCoSkipNullMembers>() - .Input<TCoToFlow>().Input(arg).Build() + .Input(replaceExpr) .Members().Add(skipNullColumns).Build() .Build() .Done().Ptr(); - } else { - argReplaces[matched->Expr.Raw()] = - Build<TCoToFlow>(ctx, matched->Expr.Pos()) - .Input(arg) - .Done() - .Ptr(); } - auto source = + if (limit) { + limit = ctx.ReplaceNodes(std::move(limit), argReplaces); + replaceExpr = + Build<TCoTake>(ctx, node.Pos()) + .Input(replaceExpr) + .Count(limit) + .Done().Ptr(); + } + + argReplaces[matched->Expr.Raw()] = replaceExpr; + + auto source = Build<TDqSource>(ctx, matched->Expr.Pos()) .Settings<TKqpReadRangesSourceSettings>() .Table(matched->Table) diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index fd12e5e80ec..09a78b55d8b 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -670,23 +670,23 @@ public: if (state->HasRanges()) { for (ui64 j = rangeIndex; j < ranges.size(); ++j) { - CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr) - << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); - - auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange()); - - if (!intersection.IsEmptyRange(KeyColumnTypes)) { + auto comparison = CompareRanges(partitionRange, ranges[j].ToTableRange(), KeyColumnTypes); + CA_LOG_D("Compare range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr) + << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) + << " : " << comparison); + + if (comparison > 0) { + continue; + } else if (comparison == 0) { + auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange()); CA_LOG_D("Add range to new shardId: " << partition.ShardId << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); newShard->AddRange(TSerializedTableRange(intersection)); } else { - CA_LOG_D("empty intersection"); - if (j > rangeIndex) { - rangeIndex = j - 1; - } break; } + rangeIndex = j; } if (newShard->HasRanges()) { diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 627939bf818..24c55dd7b24 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -1306,10 +1306,14 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { } } - Y_UNIT_TEST(SecondaryIndexOrderBy) { + Y_UNIT_TEST_TWIN(SecondaryIndexOrderBy, SourceRead) { auto setting = NKikimrKqp::TKqpSetting(); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); auto serverSettings = TKikimrSettings() - .SetKqpSettings({setting}); + .SetKqpSettings({setting}) + .SetAppConfig(appConfig); + TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1507,7 +1511,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { query) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst()); UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst()); } @@ -1533,7 +1537,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { query) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst()); UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst()); } @@ -1560,7 +1564,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { query) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst()); UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst()); } @@ -1602,10 +1606,13 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { } } - Y_UNIT_TEST(SecondaryIndexOrderBy2) { + Y_UNIT_TEST_TWIN(SecondaryIndexOrderBy2, SourceRead) { auto setting = NKikimrKqp::TKqpSetting(); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); auto serverSettings = TKikimrSettings() - .SetKqpSettings({setting}); + .SetKqpSettings({setting}) + .SetAppConfig(appConfig); TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1705,7 +1712,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str()); - UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit"), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit") || SourceRead, result.GetAst()); UNIT_ASSERT_C(result.GetAst().Contains("'('\"Reverse\")"), result.GetAst()); } @@ -1752,7 +1759,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str()); - UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit"), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit") || SourceRead, result.GetAst()); UNIT_ASSERT_C(result.GetAst().Contains("'('\"Reverse\")"), result.GetAst()); } diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 7448bed6f0d..ed6774f98f9 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3245,8 +3245,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0))); } - Y_UNIT_TEST(PagingNoPredicateExtract) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(PagingNoPredicateExtract, SourceRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr{serverSettings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -3287,6 +3292,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { // Cerr << result.GetPlan() << Endl; + if (SourceRead) { + return; + } + NJson::TJsonValue plan; NJson::ReadJsonTree(result.GetPlan(), &plan, true); auto reads = plan["tables"][0]["reads"].GetArraySafe(); diff --git a/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp index 18c3cc235ba..a2a1bf8082b 100644 --- a/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp @@ -231,8 +231,13 @@ Y_UNIT_TEST_SUITE(KqpSort) { } } - Y_UNIT_TEST(ReverseRangeLimitOptimized) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(ReverseRangeLimitOptimized, SourceRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr{serverSettings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -256,6 +261,9 @@ Y_UNIT_TEST_SUITE(KqpSort) { if (!node.IsDefined()) { node = FindPlanNodeByKv(plan, "Node Type", "Filter-TableRangeScan"); } + if (!node.IsDefined()) { + node = FindPlanNodeByKv(plan, "Node Type", "Limit-TableRangeScan"); + } UNIT_ASSERT_C(node.IsDefined(), result.GetPlan()); auto read = FindPlanNodeByKv(node, "Name", "TableRangeScan"); UNIT_ASSERT(read.IsDefined()); @@ -267,7 +275,7 @@ Y_UNIT_TEST_SUITE(KqpSort) { } else { UNIT_ASSERT(limit.IsDefined()); UNIT_ASSERT(limit.GetMapSafe().contains("Limit")); - UNIT_ASSERT_C(result.GetAst().Contains("'\"ItemsLimit\""), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("'\"ItemsLimit\"") || SourceRead, result.GetAst()); } } @@ -439,8 +447,13 @@ Y_UNIT_TEST_SUITE(KqpSort) { } } - Y_UNIT_TEST(TopSortExprPk) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(TopSortExprPk, SourceRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr{serverSettings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -460,7 +473,7 @@ Y_UNIT_TEST_SUITE(KqpSort) { auto result = session.ExplainDataQuery(query).GetValueSync(); result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_C(result.GetAst().Contains("ItemsLimit"), result.GetAst()); + UNIT_ASSERT_C(result.GetAst().Contains("ItemsLimit") || SourceRead, result.GetAst()); } { @@ -1139,8 +1152,13 @@ Y_UNIT_TEST_SUITE(KqpSort) { ])", FormatResultSetYson(result.GetResultSet(2))); } - Y_UNIT_TEST(UnionAllSortLimit) { - TKikimrRunner kikimr; + Y_UNIT_TEST_TWIN(UnionAllSortLimit, SourceRead) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig); + + TKikimrRunner kikimr{serverSettings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -1159,6 +1177,10 @@ Y_UNIT_TEST_SUITE(KqpSort) { NJson::TJsonValue plan; NJson::ReadJsonTree(result.GetPlan(), &plan, true); + if (SourceRead) { + return; + } + for (auto& read : plan["tables"][0]["reads"].GetArraySafe()) { UNIT_ASSERT(read.Has("limit")); UNIT_ASSERT_VALUES_EQUAL(read["limit"], "3"); diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index a04b3aaf2b9..45f9da8e54e 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -616,6 +616,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) { s.AssertSuccess(); UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",103,302,402,502,703"); } + + Y_UNIT_TEST_SORT(IntersectionLosesRange, Order) { + TTestSetup s; + auto shards = s.Shards(); + + auto* shim = new TReadActorPipeCacheStub(); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + shim->SetupCapture(0, 1); + s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition` where Key = 101 or (Key >= 202 and Key < 200+4) or (Key >= 701 and Key < 704)" + OrderBy(Order)); + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + s.Split(shards.at(0), 190); + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->SendCaptured(s.Runtime); + + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",101,202,203,701,702,703"); + } } |