aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-22 14:18:41 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-22 14:18:41 +0300
commitd7c89e8978179791514520aca563a81093f35013 (patch)
tree326fcd15f4eb8a0cba089769a6db70899a93e9b3
parent97c20564c62e7a829ffcc73278bfdf186196c047 (diff)
downloadydb-d7c89e8978179791514520aca563a81093f35013.tar.gz
use source read actor for point lookups
feature(kqp): use source read actor for lookups
-rw-r--r--ydb/core/kqp/executer_actor/kqp_partition_helper.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp2
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp19
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp102
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_rules.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp21
-rw-r--r--ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp8
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp33
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp16
-rw-r--r--ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp22
-rw-r--r--ydb/core/kqp/ut/query/kqp_explain_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp4
-rw-r--r--ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp4
-rw-r--r--ydb/services/ydb/ydb_table_split_ut.cpp8
15 files changed, 166 insertions, 91 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
index cf06397d15..53a7408059 100644
--- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp
@@ -584,7 +584,14 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys,
keyColumnTypes, source.GetRanges(), stageInfo, typeEnv
);
} else if (source.HasKeyRange()) {
- ranges.push_back(MakeKeyRange(keyColumnTypes, source.GetKeyRange(), stageInfo, holderFactory, typeEnv));
+ const auto& range = source.GetKeyRange();
+ if (range.GetFrom().SerializeAsString() == range.GetTo().SerializeAsString() &&
+ range.GetFrom().ValuesSize() == keyColumnTypes.size()) {
+ auto cells = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv);
+ ranges.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(cells)));
+ } else {
+ ranges.push_back(MakeKeyRange(keyColumnTypes, range, stageInfo, holderFactory, typeEnv));
+ }
} else {
ranges = BuildFullRange(keyColumnTypes);
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 8850429c70..2dd09e9913 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -633,7 +633,7 @@ void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettin
} else {
bool usePoints = true;
for (auto& range : Ranges) {
- if (std::holds_alternative<TSerializedCellVec>(range)) {
+ if (std::holds_alternative<TSerializedTableRange>(range)) {
usePoints = false;
}
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 9282ca1ca3..2bbbae1e8c 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -51,13 +51,16 @@ public:
AddHandler(1, &TKqlLookupIndex::Match, HNDL(RewriteLookupIndex));
AddHandler(1, &TKqlStreamLookupIndex::Match, HNDL(RewriteStreamLookupIndex));
- AddHandler(2, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>));
- AddHandler(2, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>));
- AddHandler(2, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<true>));
- AddHandler(2, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<true>));
+ AddHandler(2, &TKqlLookupTable::Match, HNDL(RewriteLookupTable));
+
+ AddHandler(3, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>));
+ AddHandler(3, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>));
+ AddHandler(3, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<true>));
+ AddHandler(3, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<true>));
+
#undef HNDL
- SetGlobal(2u);
+ SetGlobal(3u);
}
protected:
@@ -152,6 +155,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> RewriteLookupTable(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpRewriteLookupTable(node, ctx, KqpCtx);
+ DumpAppliedRule("RewriteLookupTable", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx);
DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
index 85a8af7b51..18779463d8 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp
@@ -247,19 +247,11 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
.Index(indexName.Cast())
.Done();
} else {
- if (kqpCtx.Config->EnableKqpDataQueryStreamPointLookup) {
- readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos())
- .Table(read.Table())
- .LookupKeys(lookupKeys)
- .Columns(read.Columns())
- .Done();
- } else {
- readInput = Build<TKqlLookupTable>(ctx, read.Pos())
- .Table(read.Table())
- .LookupKeys(lookupKeys)
- .Columns(read.Columns())
- .Done();
- }
+ readInput = Build<TKqlLookupTable>(ctx, read.Pos())
+ .Table(read.Table())
+ .LookupKeys(lookupKeys)
+ .Columns(read.Columns())
+ .Done();
}
} else if (useScanQueryLookup) {
YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup);
@@ -338,6 +330,90 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T
.Done();
}
+TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ if (!node.Maybe<TKqlLookupTable>()) {
+ return node;
+ }
+
+ const TKqlLookupTable& lookup = node.Cast<TKqlLookupTable>();
+ if (!IsDqPureExpr(lookup.LookupKeys())) {
+ if (!kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
+ return node;
+ }
+
+ return Build<TKqlStreamLookupTable>(ctx, lookup.Pos())
+ .Table(lookup.Table())
+ .LookupKeys(lookup.LookupKeys())
+ .Columns(lookup.Columns())
+ .Done();
+ } else {
+ if (!kqpCtx.Config->EnableKqpDataQuerySourceRead) {
+ return node;
+ }
+
+ TMaybeNode<TExprBase> lookupKeys = lookup.LookupKeys();
+ TMaybeNode<TCoSkipNullMembers> skipNullMembers;
+ if (lookupKeys.Maybe<TCoSkipNullMembers>()) {
+ skipNullMembers = lookupKeys.Cast<TCoSkipNullMembers>();
+ lookupKeys = skipNullMembers.Input();
+ }
+
+ auto maybeAsList = lookupKeys.Maybe<TCoAsList>();
+ if (!maybeAsList) {
+ return node;
+ }
+
+ // one point expected
+ if (maybeAsList.Cast().ArgCount() != 1) {
+ return node;
+ }
+
+ auto maybeStruct = maybeAsList.Cast().Arg(0).Maybe<TCoAsStruct>();
+ if (!maybeStruct) {
+ return node;
+ }
+
+ // full pk expected
+ const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value());
+ if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) {
+ return node;
+ }
+
+ std::unordered_map<TString, TExprBase> keyColumnsStruct;
+ for (const auto& item : maybeStruct.Cast()) {
+ const auto& tuple = item.Cast<TCoNameValueTuple>();
+ keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()});
+ }
+
+ TKqpReadTableSettings settings;
+ TVector<TExprBase> keyValues;
+ keyValues.reserve(maybeStruct.Cast().ArgCount());
+ for (const auto& name : table.Metadata->KeyColumnNames) {
+ auto it = keyColumnsStruct.find(name);
+ YQL_ENSURE(it != keyColumnsStruct.end());
+ keyValues.push_back(it->second);
+
+ if (skipNullMembers) {
+ settings.AddSkipNullKey(name);
+ }
+ }
+
+ return Build<TKqlReadTable>(ctx, lookup.Pos())
+ .Table(lookup.Table())
+ .Range<TKqlKeyRange>()
+ .From<TKqlKeyInc>()
+ .Add(keyValues)
+ .Build()
+ .To<TKqlKeyInc>()
+ .Add(keyValues)
+ .Build()
+ .Build()
+ .Columns(lookup.Columns())
+ .Settings(settings.BuildNode(ctx, lookup.Pos()))
+ .Done();
+ }
+}
+
TExprBase KqpDropTakeOverLookupTable(const TExprBase& node, TExprContext&, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TCoTake>().Input().Maybe<TKqlLookupTableBase>()) {
return node;
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
index d96b71b3ee..ecab5b3406 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h
@@ -41,6 +41,9 @@ NYql::NNodes::TExprBase KqpRewriteLookupIndex(const NYql::NNodes::TExprBase& nod
NYql::NNodes::TExprBase KqpRewriteStreamLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
+NYql::NNodes::TExprBase KqpRewriteLookupTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx,
+ const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpRewriteTopSortOverIndexRead(const NYql::NNodes::TExprBase& node, NYql::TExprContext&,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b747c796ee..025fff3e3d 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -700,13 +700,24 @@ public:
return;
}
- if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
- for (auto& issue : record.GetStatus().GetIssues()) {
- CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
- Reads[id].Shard->Issues.push_back(issue);
+ switch (record.GetStatus().GetCode()) {
+ case Ydb::StatusIds::SUCCESS:
+ break;
+ case Ydb::StatusIds::OVERLOADED:
+ case Ydb::StatusIds::INTERNAL_ERROR: {
+ for (auto& issue : record.GetStatus().GetIssues()) {
+ CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
+ Reads[id].Shard->Issues.push_back(issue);
+ }
+ return RetryRead(id);
+ }
+ default: {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
+ return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues);
}
- return RetryRead(id);
}
+
for (auto& lock : record.GetTxLocks()) {
Locks.push_back(lock);
}
diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
index b97507e0ff..fe160d0228 100644
--- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp
@@ -40,9 +40,9 @@ void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, cons
setting.SetName("_KqpAllowUnsafeCommit");
setting.SetValue("true");
- // stream lookup use iterator interface, that doesn't use datashard transactions
+ // source read use iterator interface, that doesn't use datashard transactions
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
@@ -370,9 +370,9 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
unsafeCommitSetting.SetName("_KqpAllowUnsafeCommit");
unsafeCommitSetting.SetValue("true");
- // stream lookup use iterator interface, that doesn't use datashard transactions
+ // source read use iterator interface, that doesn't use datashard transactions
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 276dc9341c..eaf7acb1c0 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -82,14 +82,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpReadRangesSource"), explainResult.GetAst());
} else {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
+ UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst());
}
- UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst());
-
auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$key").Uint64(302).Build()
.Build();
@@ -147,8 +146,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) {
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpReadRangesSource"), explainResult.GetAst());
} else {
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
}
@@ -3396,28 +3395,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2);
}
-
- {
- auto result = session.ExecuteDataQuery(R"(
- --!syntax_v1
-
- SELECT * FROM `/Root/KeyValue`
- WHERE Key = 1;
- )", TTxControl::BeginTx().CommitTx(), querySettings).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0)));
-
- NJson::TJsonValue plan;
- NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true);
- auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
- UNIT_ASSERT(streamLookup.IsDefined());
-
- auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
- }
}
Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) {
diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
index 2034ab7476..774f44c7f6 100644
--- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp
@@ -650,14 +650,11 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/MultiShardTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1);
-
- if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
- }
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
@@ -1048,18 +1045,17 @@ Y_UNIT_TEST_SUITE(KqpRanges) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ Cerr << result.GetQueryPlan() << Endl;
+
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).deletes().rows(), 0);
-
- if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
- }
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
index 94f86a7897..7c8bd7726f 100644
--- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
+++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
@@ -120,9 +120,9 @@ TParams BuildInsertIndexParams(TTableClient& client) {
} // namespace
Y_UNIT_TEST_SUITE(KqpQueryPerf) {
- Y_UNIT_TEST_TWIN(KvRead, EnableStreamLookup) {
+ Y_UNIT_TEST_TWIN(KvRead, EnableSourceRead) {
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamPointLookup(EnableStreamLookup);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(EnableSourceRead);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
@@ -147,33 +147,25 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
// Cerr << stats.query_plan() << Endl;
- // TODO: Fix stream lookup case
- if (!EnableStreamLookup) {
- AssertTableStats(result, "/Root/EightShard", {
- .ExpectedReads = 1,
- });
- }
+ AssertTableStats(result, "/Root/EightShard", {.ExpectedReads = 1,});
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
-
- // TODO: Fix stream lookup case
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), EnableStreamLookup ? 0 : 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1);
NJson::TJsonValue plan;
NJson::ReadJsonTree(stats.query_plan(), &plan, true);
auto stages = FindPlanStages(plan);
- // TODO: Fix stream lookup case
- UNIT_ASSERT_VALUES_EQUAL(stages.size(), EnableStreamLookup ? 3 : 2);
+ UNIT_ASSERT_VALUES_EQUAL(stages.size(), 2);
i64 totalTasks = 0;
for (const auto& stage : stages) {
totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe();
}
- // TODO: Fix stream lookup case
- UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableStreamLookup ? 3 : 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
}
Y_UNIT_TEST_TWIN(RangeLimitRead, EnableSourceRead) {
diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
index cd638d2ce8..c04bdb4fbb 100644
--- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
@@ -512,8 +512,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);
ui32 lookupsCount = 0;
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) {
+ lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup");
} else {
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
}
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 434b0d3ac2..636e93a7a1 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryTimeoutImmediate) {
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
@@ -414,7 +414,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
Y_UNIT_TEST(QueryCancelImmediate) {
NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
index a484eb9b64..05819756ef 100644
--- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
+++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
if (result.GetStatus() == EStatus::SUCCESS)
continue;
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) {
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
[](const NYql::TIssue& issue){
return issue.GetMessage().Contains("bellow low watermark");
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 5a054dba47..400361d6fe 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -1526,7 +1526,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, StreamLookup) {
Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
- app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
@@ -3505,7 +3505,7 @@ Y_UNIT_TEST(TestLateKqpDataReadAfterColumnDrop) {
Y_UNIT_TEST(MvccTestSnapshotRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
- app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
+ app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp
index bb497a448c..8e1b82e514 100644
--- a/ydb/services/ydb/ydb_table_split_ut.cpp
+++ b/ydb/services/ydb/ydb_table_split_ut.cpp
@@ -215,7 +215,9 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) {
"SELECT * FROM `/Root/Foo` \n"
"WHERE NameHash = $name_hash AND Name = $name";
- TKikimrWithGrpcAndRootSchema server;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ TKikimrWithGrpcAndRootSchema server(appConfig);
DoTestSplitByLoad(server, query, /* fill with data */ true, /* at least two splits */ 2);
}
@@ -365,7 +367,9 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) {
TIntrusivePtr<TTestTimeProvider> testTimeProvider = new TTestTimeProvider(originalTimeProvider);
NKikimr::TAppData::TimeProvider = testTimeProvider;
- TKikimrWithGrpcAndRootSchema server;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
+ TKikimrWithGrpcAndRootSchema server(appConfig);
// Set min uptime before merge by load to 10h
TAtomic unused;