aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-08-03 14:09:04 +0300
committerssmike <ssmike@ydb.tech>2023-08-03 14:09:04 +0300
commit3d60ef65e18a3b25c2d163969485c15b09bac950 (patch)
tree41bf1dd839e110d05467b1c0312bb54917be4928
parentc6a76f7650e817b0054ab46844c982d8a9903b8b (diff)
downloadydb-3d60ef65e18a3b25c2d163969485c15b09bac950.tar.gz
Fix iterator reads
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp20
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp36
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp20
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp25
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp13
-rw-r--r--ydb/core/kqp/ut/opt/kqp_sort_ut.cpp38
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp19
8 files changed, 128 insertions, 49 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index 8731b9ad18e..275a1c68726 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -335,23 +335,23 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet
auto newShard = TShardState(partition.ShardId, ++ScansCounter);
for (ui64 j = i; j < state.Ranges.size(); ++j) {
- CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr)
- << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
-
- auto intersection = Intersect(KeyColumnTypes, partitionRange, state.Ranges[j].ToTableRange());
-
- if (!intersection.IsEmptyRange(KeyColumnTypes)) {
+ auto comparison = CompareRanges(partitionRange, state.Ranges[j].ToTableRange(), KeyColumnTypes);
+ CA_LOG_D("Compare range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr)
+ << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
+ << " : " << comparison);
+
+ if (comparison > 0) {
+ continue;
+ } else if (comparison == 0) {
+ auto intersection = Intersect(KeyColumnTypes, partitionRange, state.Ranges[j].ToTableRange());
CA_LOG_D("Add range to new shardId: " << partition.ShardId
<< ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
newShard.Ranges.emplace_back(TSerializedTableRange(intersection));
} else {
- CA_LOG_D("empty intersection");
- if (j > i) {
- i = j - 1;
- }
break;
}
+ i = j;
}
if (!newShard.Ranges.empty()) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index cfdf9c94cc2..80756a76a62 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -846,6 +846,12 @@ protected:
if (source.GetSequentialInFlightShards()) {
auto [startShard, shardInfo] = MakeVirtualTablePartition(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ if (Stats) {
+ THashMap<ui64, TShardInfo> partitions = PrunePartitions(GetTableKeys(), source, stageInfo, HolderFactory(), TypeEnv());
+ for (auto& [shardId, _] : partitions) {
+ Stats->AffectedShards.insert(shardId);
+ }
+ }
if (shardInfo.KeyReadRanges) {
addPartiton(startShard, {}, shardInfo, source.GetSequentialInFlightShards());
return Nothing();
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
index 1e9b774c0a9..56137612996 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
@@ -81,6 +81,7 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
auto settings = TKqpReadTableSettings::Parse(matched->Settings);
auto selectColumns = matched->Columns;
TVector<TCoAtom> skipNullColumns;
+ TExprNode::TPtr limit;
if (settings.SkipNullKeys) {
THashSet<TString> seenColumns;
TVector<TCoAtom> columns;
@@ -99,10 +100,13 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
columns.push_back(atom);
}
}
-
+
matched->Columns = Build<TCoAtomList>(ctx, matched->Columns.Pos()).Add(columns).Done();
settings.SkipNullKeys.clear();
+ limit = settings.ItemsLimit;
+ settings.ItemsLimit = nullptr;
+
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
}
@@ -125,24 +129,36 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
TCoArgument arg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_source_arg")};
args.insert(args.begin(), arg);
+
+ TExprNode::TPtr replaceExpr =
+ Build<TCoToFlow>(ctx, matched->Expr.Pos())
+ .Input(arg)
+ .Done()
+ .Ptr();
+
if (skipNullColumns) {
- argReplaces[matched->Expr.Raw()] =
+ replaceExpr =
Build<TCoExtractMembers>(ctx, node.Pos())
.Members(selectColumns)
.Input<TCoSkipNullMembers>()
- .Input<TCoToFlow>().Input(arg).Build()
+ .Input(replaceExpr)
.Members().Add(skipNullColumns).Build()
.Build()
.Done().Ptr();
- } else {
- argReplaces[matched->Expr.Raw()] =
- Build<TCoToFlow>(ctx, matched->Expr.Pos())
- .Input(arg)
- .Done()
- .Ptr();
}
- auto source =
+ if (limit) {
+ limit = ctx.ReplaceNodes(std::move(limit), argReplaces);
+ replaceExpr =
+ Build<TCoTake>(ctx, node.Pos())
+ .Input(replaceExpr)
+ .Count(limit)
+ .Done().Ptr();
+ }
+
+ argReplaces[matched->Expr.Raw()] = replaceExpr;
+
+ auto source =
Build<TDqSource>(ctx, matched->Expr.Pos())
.Settings<TKqpReadRangesSourceSettings>()
.Table(matched->Table)
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index fd12e5e80ec..09a78b55d8b 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -670,23 +670,23 @@ public:
if (state->HasRanges()) {
for (ui64 j = rangeIndex; j < ranges.size(); ++j) {
- CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr)
- << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
-
- auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange());
-
- if (!intersection.IsEmptyRange(KeyColumnTypes)) {
+ auto comparison = CompareRanges(partitionRange, ranges[j].ToTableRange(), KeyColumnTypes);
+ CA_LOG_D("Compare range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr)
+ << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
+ << " : " << comparison);
+
+ if (comparison > 0) {
+ continue;
+ } else if (comparison == 0) {
+ auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange());
CA_LOG_D("Add range to new shardId: " << partition.ShardId
<< ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
newShard->AddRange(TSerializedTableRange(intersection));
} else {
- CA_LOG_D("empty intersection");
- if (j > rangeIndex) {
- rangeIndex = j - 1;
- }
break;
}
+ rangeIndex = j;
}
if (newShard->HasRanges()) {
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 627939bf818..24c55dd7b24 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -1306,10 +1306,14 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
}
}
- Y_UNIT_TEST(SecondaryIndexOrderBy) {
+ Y_UNIT_TEST_TWIN(SecondaryIndexOrderBy, SourceRead) {
auto setting = NKikimrKqp::TKqpSetting();
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
auto serverSettings = TKikimrSettings()
- .SetKqpSettings({setting});
+ .SetKqpSettings({setting})
+ .SetAppConfig(appConfig);
+
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1507,7 +1511,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
query)
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst());
UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst());
}
@@ -1533,7 +1537,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
query)
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst());
UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst());
}
@@ -1560,7 +1564,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
query)
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\""), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("('\"ItemsLimit\"") || SourceRead, result.GetAst());
UNIT_ASSERT_C(!result.GetAst().Contains("'('\"Reverse\")"), result.GetAst());
}
@@ -1602,10 +1606,13 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
}
}
- Y_UNIT_TEST(SecondaryIndexOrderBy2) {
+ Y_UNIT_TEST_TWIN(SecondaryIndexOrderBy2, SourceRead) {
auto setting = NKikimrKqp::TKqpSetting();
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
auto serverSettings = TKikimrSettings()
- .SetKqpSettings({setting});
+ .SetKqpSettings({setting})
+ .SetAppConfig(appConfig);
TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1705,7 +1712,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());
- UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit"), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit") || SourceRead, result.GetAst());
UNIT_ASSERT_C(result.GetAst().Contains("'('\"Reverse\")"), result.GetAst());
}
@@ -1752,7 +1759,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());
- UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit"), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("'('\"ItemsLimit") || SourceRead, result.GetAst());
UNIT_ASSERT_C(result.GetAst().Contains("'('\"Reverse\")"), result.GetAst());
}
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 7448bed6f0d..ed6774f98f9 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3245,8 +3245,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
CompareYson(R"([[["Value1"]]])", FormatResultSetYson(result.GetResultSet(0)));
}
- Y_UNIT_TEST(PagingNoPredicateExtract) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(PagingNoPredicateExtract, SourceRead) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr{serverSettings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -3287,6 +3292,10 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
// Cerr << result.GetPlan() << Endl;
+ if (SourceRead) {
+ return;
+ }
+
NJson::TJsonValue plan;
NJson::ReadJsonTree(result.GetPlan(), &plan, true);
auto reads = plan["tables"][0]["reads"].GetArraySafe();
diff --git a/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp b/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp
index 18c3cc235ba..a2a1bf8082b 100644
--- a/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_sort_ut.cpp
@@ -231,8 +231,13 @@ Y_UNIT_TEST_SUITE(KqpSort) {
}
}
- Y_UNIT_TEST(ReverseRangeLimitOptimized) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(ReverseRangeLimitOptimized, SourceRead) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr{serverSettings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -256,6 +261,9 @@ Y_UNIT_TEST_SUITE(KqpSort) {
if (!node.IsDefined()) {
node = FindPlanNodeByKv(plan, "Node Type", "Filter-TableRangeScan");
}
+ if (!node.IsDefined()) {
+ node = FindPlanNodeByKv(plan, "Node Type", "Limit-TableRangeScan");
+ }
UNIT_ASSERT_C(node.IsDefined(), result.GetPlan());
auto read = FindPlanNodeByKv(node, "Name", "TableRangeScan");
UNIT_ASSERT(read.IsDefined());
@@ -267,7 +275,7 @@ Y_UNIT_TEST_SUITE(KqpSort) {
} else {
UNIT_ASSERT(limit.IsDefined());
UNIT_ASSERT(limit.GetMapSafe().contains("Limit"));
- UNIT_ASSERT_C(result.GetAst().Contains("'\"ItemsLimit\""), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("'\"ItemsLimit\"") || SourceRead, result.GetAst());
}
}
@@ -439,8 +447,13 @@ Y_UNIT_TEST_SUITE(KqpSort) {
}
}
- Y_UNIT_TEST(TopSortExprPk) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(TopSortExprPk, SourceRead) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr{serverSettings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -460,7 +473,7 @@ Y_UNIT_TEST_SUITE(KqpSort) {
auto result = session.ExplainDataQuery(query).GetValueSync();
result.GetIssues().PrintTo(Cerr);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_C(result.GetAst().Contains("ItemsLimit"), result.GetAst());
+ UNIT_ASSERT_C(result.GetAst().Contains("ItemsLimit") || SourceRead, result.GetAst());
}
{
@@ -1139,8 +1152,13 @@ Y_UNIT_TEST_SUITE(KqpSort) {
])", FormatResultSetYson(result.GetResultSet(2)));
}
- Y_UNIT_TEST(UnionAllSortLimit) {
- TKikimrRunner kikimr;
+ Y_UNIT_TEST_TWIN(UnionAllSortLimit, SourceRead) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead);
+ auto serverSettings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr{serverSettings};
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -1159,6 +1177,10 @@ Y_UNIT_TEST_SUITE(KqpSort) {
NJson::TJsonValue plan;
NJson::ReadJsonTree(result.GetPlan(), &plan, true);
+ if (SourceRead) {
+ return;
+ }
+
for (auto& read : plan["tables"][0]["reads"].GetArraySafe()) {
UNIT_ASSERT(read.Has("limit"));
UNIT_ASSERT_VALUES_EQUAL(read["limit"], "3");
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index a04b3aaf2b9..45f9da8e54e 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -616,6 +616,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
s.AssertSuccess();
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",103,302,402,502,703");
}
+
+ Y_UNIT_TEST_SORT(IntersectionLosesRange, Order) {
+ TTestSetup s;
+ auto shards = s.Shards();
+
+ auto* shim = new TReadActorPipeCacheStub();
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ shim->SetupCapture(0, 1);
+ s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition` where Key = 101 or (Key >= 202 and Key < 200+4) or (Key >= 701 and Key < 704)" + OrderBy(Order));
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ s.Split(shards.at(0), 190);
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->SendCaptured(s.Runtime);
+
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",101,202,203,701,702,703");
+ }
}