diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-02-22 14:18:41 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-02-22 14:18:41 +0300 |
commit | d7c89e8978179791514520aca563a81093f35013 (patch) | |
tree | 326fcd15f4eb8a0cba089769a6db70899a93e9b3 | |
parent | 97c20564c62e7a829ffcc73278bfdf186196c047 (diff) | |
download | ydb-d7c89e8978179791514520aca563a81093f35013.tar.gz |
use source read actor for point lookups
feature(kqp): use source read actor for lookups
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_partition_helper.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp | 102 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log_rules.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_explain_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_query_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 4 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_split_ut.cpp | 8 |
15 files changed, 166 insertions, 91 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp index cf06397d15..53a7408059 100644 --- a/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partition_helper.cpp @@ -584,7 +584,14 @@ THashMap<ui64, TShardInfo> PrunePartitions(const TKqpTableKeys& tableKeys, keyColumnTypes, source.GetRanges(), stageInfo, typeEnv ); } else if (source.HasKeyRange()) { - ranges.push_back(MakeKeyRange(keyColumnTypes, source.GetKeyRange(), stageInfo, holderFactory, typeEnv)); + const auto& range = source.GetKeyRange(); + if (range.GetFrom().SerializeAsString() == range.GetTo().SerializeAsString() && + range.GetFrom().ValuesSize() == keyColumnTypes.size()) { + auto cells = FillKeyValues(keyColumnTypes, range.GetFrom(), stageInfo, holderFactory, typeEnv); + ranges.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(cells))); + } else { + ranges.push_back(MakeKeyRange(keyColumnTypes, range, stageInfo, holderFactory, typeEnv)); + } } else { ranges = BuildFullRange(keyColumnTypes); } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 8850429c70..2dd09e9913 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -633,7 +633,7 @@ void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettin } else { bool usePoints = true; for (auto& range : Ranges) { - if (std::holds_alternative<TSerializedCellVec>(range)) { + if (std::holds_alternative<TSerializedTableRange>(range)) { usePoints = false; } } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 9282ca1ca3..2bbbae1e8c 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -51,13 +51,16 @@ public: AddHandler(1, &TKqlLookupIndex::Match, HNDL(RewriteLookupIndex)); AddHandler(1, &TKqlStreamLookupIndex::Match, HNDL(RewriteStreamLookupIndex)); - AddHandler(2, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>)); - AddHandler(2, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>)); - AddHandler(2, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<true>)); - AddHandler(2, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<true>)); + AddHandler(2, &TKqlLookupTable::Match, HNDL(RewriteLookupTable)); + + AddHandler(3, &TKqlReadTableBase::Match, HNDL(ApplyExtractMembersToReadTable<true>)); + AddHandler(3, &TKqlReadTableRangesBase::Match, HNDL(ApplyExtractMembersToReadTableRanges<true>)); + AddHandler(3, &TKqpReadOlapTableRangesBase::Match, HNDL(ApplyExtractMembersToReadOlapTable<true>)); + AddHandler(3, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<true>)); + #undef HNDL - SetGlobal(2u); + SetGlobal(3u); } protected: @@ -152,6 +155,12 @@ protected: return output; } + TMaybeNode<TExprBase> RewriteLookupTable(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteLookupTable(node, ctx, KqpCtx); + DumpAppliedRule("RewriteLookupTable", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> DeleteOverLookup(TExprBase node, TExprContext& ctx) { TExprBase output = KqpDeleteOverLookup(node, ctx, KqpCtx); DumpAppliedRule("DeleteOverLookup", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index 85a8af7b51..18779463d8 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -247,19 +247,11 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Index(indexName.Cast()) .Done(); } else { - if (kqpCtx.Config->EnableKqpDataQueryStreamPointLookup) { - readInput = Build<TKqlStreamLookupTable>(ctx, read.Pos()) - .Table(read.Table()) - .LookupKeys(lookupKeys) - .Columns(read.Columns()) - .Done(); - } else { - readInput = Build<TKqlLookupTable>(ctx, read.Pos()) - .Table(read.Table()) - .LookupKeys(lookupKeys) - .Columns(read.Columns()) - .Done(); - } + readInput = Build<TKqlLookupTable>(ctx, read.Pos()) + .Table(read.Table()) + .LookupKeys(lookupKeys) + .Columns(read.Columns()) + .Done(); } } else if (useScanQueryLookup) { YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamLookup); @@ -338,6 +330,90 @@ TExprBase KqpPushPredicateToReadTable(TExprBase node, TExprContext& ctx, const T .Done(); } +TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + if (!node.Maybe<TKqlLookupTable>()) { + return node; + } + + const TKqlLookupTable& lookup = node.Cast<TKqlLookupTable>(); + if (!IsDqPureExpr(lookup.LookupKeys())) { + if (!kqpCtx.Config->EnableKqpDataQueryStreamLookup) { + return node; + } + + return Build<TKqlStreamLookupTable>(ctx, lookup.Pos()) + .Table(lookup.Table()) + .LookupKeys(lookup.LookupKeys()) + .Columns(lookup.Columns()) + .Done(); + } else { + if (!kqpCtx.Config->EnableKqpDataQuerySourceRead) { + return node; + } + + TMaybeNode<TExprBase> lookupKeys = lookup.LookupKeys(); + TMaybeNode<TCoSkipNullMembers> skipNullMembers; + if (lookupKeys.Maybe<TCoSkipNullMembers>()) { + skipNullMembers = lookupKeys.Cast<TCoSkipNullMembers>(); + lookupKeys = skipNullMembers.Input(); + } + + auto maybeAsList = lookupKeys.Maybe<TCoAsList>(); + if (!maybeAsList) { + return node; + } + + // one point expected + if (maybeAsList.Cast().ArgCount() != 1) { + return node; + } + + auto maybeStruct = maybeAsList.Cast().Arg(0).Maybe<TCoAsStruct>(); + if (!maybeStruct) { + return node; + } + + // full pk expected + const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, lookup.Table().Path().Value()); + if (table.Metadata->KeyColumnNames.size() != maybeStruct.Cast().ArgCount()) { + return node; + } + + std::unordered_map<TString, TExprBase> keyColumnsStruct; + for (const auto& item : maybeStruct.Cast()) { + const auto& tuple = item.Cast<TCoNameValueTuple>(); + keyColumnsStruct.insert({TString(tuple.Name().Value()), tuple.Value().Cast()}); + } + + TKqpReadTableSettings settings; + TVector<TExprBase> keyValues; + keyValues.reserve(maybeStruct.Cast().ArgCount()); + for (const auto& name : table.Metadata->KeyColumnNames) { + auto it = keyColumnsStruct.find(name); + YQL_ENSURE(it != keyColumnsStruct.end()); + keyValues.push_back(it->second); + + if (skipNullMembers) { + settings.AddSkipNullKey(name); + } + } + + return Build<TKqlReadTable>(ctx, lookup.Pos()) + .Table(lookup.Table()) + .Range<TKqlKeyRange>() + .From<TKqlKeyInc>() + .Add(keyValues) + .Build() + .To<TKqlKeyInc>() + .Add(keyValues) + .Build() + .Build() + .Columns(lookup.Columns()) + .Settings(settings.BuildNode(ctx, lookup.Pos())) + .Done(); + } +} + TExprBase KqpDropTakeOverLookupTable(const TExprBase& node, TExprContext&, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TCoTake>().Input().Maybe<TKqlLookupTableBase>()) { return node; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h index d96b71b3ee..ecab5b3406 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_rules.h @@ -41,6 +41,9 @@ NYql::NNodes::TExprBase KqpRewriteLookupIndex(const NYql::NNodes::TExprBase& nod NYql::NNodes::TExprBase KqpRewriteStreamLookupIndex(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); +NYql::NNodes::TExprBase KqpRewriteLookupTable(const NYql::NNodes::TExprBase& node, NYql::TExprContext& ctx, + const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpRewriteTopSortOverIndexRead(const NYql::NNodes::TExprBase& node, NYql::TExprContext&, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b747c796ee..025fff3e3d 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -700,13 +700,24 @@ public: return; } - if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { - for (auto& issue : record.GetStatus().GetIssues()) { - CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); - Reads[id].Shard->Issues.push_back(issue); + switch (record.GetStatus().GetCode()) { + case Ydb::StatusIds::SUCCESS: + break; + case Ydb::StatusIds::OVERLOADED: + case Ydb::StatusIds::INTERNAL_ERROR: { + for (auto& issue : record.GetStatus().GetIssues()) { + CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); + Reads[id].Shard->Issues.push_back(issue); + } + return RetryRead(id); + } + default: { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); + return RuntimeError("Read request aborted", NYql::NDqProto::StatusIds::ABORTED, issues); } - return RetryRead(id); } + for (auto& lock : record.GetTxLocks()) { Locks.push_back(lock); } diff --git a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp index b97507e0ff..fe160d0228 100644 --- a/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp @@ -40,9 +40,9 @@ void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, cons setting.SetName("_KqpAllowUnsafeCommit"); setting.SetValue("true"); - // stream lookup use iterator interface, that doesn't use datashard transactions + // source read use iterator interface, that doesn't use datashard transactions NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) @@ -370,9 +370,9 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) { unsafeCommitSetting.SetName("_KqpAllowUnsafeCommit"); unsafeCommitSetting.SetValue("true"); - // stream lookup use iterator interface, that doesn't use datashard transactions + // source read use iterator interface, that doesn't use datashard transactions NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index 276dc9341c..eaf7acb1c0 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -82,14 +82,13 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpReadRangesSource"), explainResult.GetAst()); } else { UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); + UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst()); } - UNIT_ASSERT_C(!explainResult.GetAst().Contains("Take"), explainResult.GetAst()); - auto params = kikimr.GetTableClient().GetParamsBuilder() .AddParam("$key").Uint64(302).Build() .Build(); @@ -147,8 +146,8 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto explainResult = session.ExplainDataQuery(query).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString()); - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst()); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) { + UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpReadRangesSource"), explainResult.GetAst()); } else { UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst()); } @@ -3396,28 +3395,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 2); } - - { - auto result = session.ExecuteDataQuery(R"( - --!syntax_v1 - - SELECT * FROM `/Root/KeyValue` - WHERE Key = 1; - )", TTxControl::BeginTx().CommitTx(), querySettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - CompareYson(R"([[[1u];["One"]]])", FormatResultSetYson(result.GetResultSet(0))); - - NJson::TJsonValue plan; - NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); - auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); - UNIT_ASSERT(streamLookup.IsDefined()); - - auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/KeyValue"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); - } } Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) { diff --git a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp index 2034ab7476..774f44c7f6 100644 --- a/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ranges_ut.cpp @@ -650,14 +650,11 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/MultiShardTable"); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 1); - - if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); - } + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 2); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); @@ -1048,18 +1045,17 @@ Y_UNIT_TEST_SUITE(KqpRanges) { auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + Cerr << result.GetQueryPlan() << Endl; + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 0); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).deletes().rows(), 0); - - if (!serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1); - } + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 3); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 94f86a7897..7c8bd7726f 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -120,9 +120,9 @@ TParams BuildInsertIndexParams(TTableClient& client) { } // namespace Y_UNIT_TEST_SUITE(KqpQueryPerf) { - Y_UNIT_TEST_TWIN(KvRead, EnableStreamLookup) { + Y_UNIT_TEST_TWIN(KvRead, EnableSourceRead) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamPointLookup(EnableStreamLookup); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(EnableSourceRead); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; @@ -147,33 +147,25 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { // Cerr << stats.query_plan() << Endl; - // TODO: Fix stream lookup case - if (!EnableStreamLookup) { - AssertTableStats(result, "/Root/EightShard", { - .ExpectedReads = 1, - }); - } + AssertTableStats(result, "/Root/EightShard", {.ExpectedReads = 1,}); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - - // TODO: Fix stream lookup case - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), EnableStreamLookup ? 0 : 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); NJson::TJsonValue plan; NJson::ReadJsonTree(stats.query_plan(), &plan, true); auto stages = FindPlanStages(plan); - // TODO: Fix stream lookup case - UNIT_ASSERT_VALUES_EQUAL(stages.size(), EnableStreamLookup ? 3 : 2); + UNIT_ASSERT_VALUES_EQUAL(stages.size(), 2); i64 totalTasks = 0; for (const auto& stage : stages) { totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("TotalTasks").GetIntegerSafe(); } - // TODO: Fix stream lookup case - UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableStreamLookup ? 3 : 2); + + UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2); } Y_UNIT_TEST_TWIN(RangeLimitRead, EnableSourceRead) { diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp index cd638d2ce8..c04bdb4fbb 100644 --- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp @@ -512,8 +512,8 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1); ui32 lookupsCount = 0; - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup"); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) { + lookupsCount = CountPlanNodesByKv(plan, "Node Type", "Stage-TablePointLookup"); } else { lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr"); } diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 434b0d3ac2..636e93a7a1 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryTimeoutImmediate) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; @@ -414,7 +414,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(QueryCancelImmediate) { NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index a484eb9b64..05819756ef 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { if (result.GetStatus() == EStatus::SUCCESS) continue; - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead()) { UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue){ return issue.GetMessage().Contains("bellow low watermark"); diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 5a054dba47..400361d6fe 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -1526,7 +1526,7 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderLockLost, StreamLookup) { Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) { TPortManager pm; NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetAppConfig(app) @@ -3505,7 +3505,7 @@ Y_UNIT_TEST(TestLateKqpDataReadAfterColumnDrop) { Y_UNIT_TEST(MvccTestSnapshotRead) { TPortManager pm; NKikimrConfig::TAppConfig app; - app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false); + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); TServerSettings serverSettings(pm.GetPort(2134)); serverSettings.SetDomainName("Root") .SetAppConfig(app) diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp index bb497a448c..8e1b82e514 100644 --- a/ydb/services/ydb/ydb_table_split_ut.cpp +++ b/ydb/services/ydb/ydb_table_split_ut.cpp @@ -215,7 +215,9 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) { "SELECT * FROM `/Root/Foo` \n" "WHERE NameHash = $name_hash AND Name = $name"; - TKikimrWithGrpcAndRootSchema server; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + TKikimrWithGrpcAndRootSchema server(appConfig); DoTestSplitByLoad(server, query, /* fill with data */ true, /* at least two splits */ 2); } @@ -365,7 +367,9 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) { TIntrusivePtr<TTestTimeProvider> testTimeProvider = new TTestTimeProvider(originalTimeProvider); NKikimr::TAppData::TimeProvider = testTimeProvider; - TKikimrWithGrpcAndRootSchema server; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false); + TKikimrWithGrpcAndRootSchema server(appConfig); // Set min uptime before merge by load to 10h TAtomic unused; |