aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-04-17 14:35:22 +0300
committerssmike <ssmike@ydb.tech>2023-04-17 14:35:22 +0300
commita06be2f06b558b4c2f4038edaf8b12be4179a423 (patch)
treed4d466d8339e89f6c59f34d1380736573158f06e
parent420830e04a7935fb36b3be307000a2110980f5a9 (diff)
downloadydb-a06be2f06b558b4c2f4038edaf8b12be4179a423.tar.gz
Rewrite physical reads to sources on last step
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h5
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp25
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp189
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp158
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h11
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp124
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp171
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp7
-rw-r--r--ydb/core/kqp/ut/query/kqp_explain_ut.cpp14
13 files changed, 236 insertions, 472 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 0c53539739d..c78eb86c075 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -703,7 +703,10 @@ protected:
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasReadRangesSource());
- YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
+ YQL_ENSURE(stage.GetSources(0).GetInputIndex() == 0 && stage.SourcesSize() == 1);
+ for (auto& input : stage.inputs()) {
+ YQL_ENSURE(input.HasBroadcast());
+ }
auto& source = stage.GetSources(0).GetReadRangesSource();
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
index 63bcbe9b24e..852ad9e1d57 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt
@@ -28,6 +28,7 @@ target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
index 5134021e615..7204a81d075 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt
@@ -29,6 +29,7 @@ target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
index 5134021e615..7204a81d075 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt
@@ -29,6 +29,7 @@ target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
diff --git a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
index 63bcbe9b24e..852ad9e1d57 100644
--- a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt
@@ -28,6 +28,7 @@ target_sources(kqp-opt-physical PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index cb093f1c7dd..2cc8d54b9d0 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -34,9 +34,7 @@ public:
AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage));
AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage));
AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages));
- AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); },
- HNDL(RemoveRedundantSortByPk));
- AddHandler(0, &TDqStage::Match, HNDL(RemoveRedundantSortByPkOverSource));
+ AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk));
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter));
AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage));
@@ -80,7 +78,6 @@ public:
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
- AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource));
AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage));
@@ -116,7 +113,7 @@ public:
AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>));
AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>));
- AddHandler(2, &TDqStage::Match, HNDL(ExpandNullMembersForReadTableSource));
+ AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable));
#undef HNDL
SetGlobal(1u);
@@ -148,27 +145,15 @@ protected:
return output;
}
- TMaybeNode<TExprBase> RemoveRedundantSortByPkOverSource(TExprBase node, TExprContext& ctx) {
- TExprBase output = KqpRemoveRedundantSortByPkOverSource(node, ctx, KqpCtx);
- DumpAppliedRule("RemoveRedundantSortByPkOverSource", node.Ptr(), output.Ptr(), ctx);
- return output;
- }
-
TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx);
DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx);
return output;
}
- TMaybeNode<TExprBase> ExpandNullMembersForReadTableSource(TExprBase node, TExprContext& ctx) {
- TExprBase output = ExpandSkipNullMembersForReadTableSource(node, ctx, KqpCtx);
- DumpAppliedRule("ExpandSkipNullMembersForReadTableSource", node.Ptr(), output.Ptr(), ctx);
- return output;
- }
-
- TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) {
- TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx);
- DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx);
+ TMaybeNode<TExprBase> RewriteKqpReadTable(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpRewriteReadTable(node, ctx, KqpCtx);
+ DumpAppliedRule("RewriteKqpReadTable", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 22cd6719023..e84a9d6aeda 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -78,72 +78,6 @@ TMaybeNode<TDqPhyPrecompute> BuildLookupKeysPrecompute(const TExprBase& input, T
.Done();
}
-// ReadRangesSource can't deal with skipnullkeys, so we should expand it to (ExtractMembers (SkipNullKeys))
-//FIXME: simplify KIKIMR-16987
-NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext&) {
- auto stage = node.Cast<TDqStage>();
- TMaybe<size_t> tableSourceIndex;
- for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
- auto input = stage.Inputs().Item(i);
- if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
- tableSourceIndex = i;
- }
- }
- if (!tableSourceIndex) {
- return node;
- }
-
- auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
- auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
- auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
-
- if (settings.SkipNullKeys.empty()) {
- return node;
- }
-
- auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
-
- THashSet<TString> seenColumns;
- TVector<TCoAtom> columns;
- TVector<TCoAtom> skipNullColumns;
- for (size_t i = 0; i < readRangesSource.Columns().Size(); ++i) {
- auto atom = readRangesSource.Columns().Item(i);
- auto column = atom.StringValue();
- if (seenColumns.insert(column).second) {
- columns.push_back(atom);
- }
- }
- for (auto& column : settings.SkipNullKeys) {
- TCoAtom atom(ctx.NewAtom(readRangesSource.Settings().Pos(), column));
- skipNullColumns.push_back(atom);
- if (seenColumns.insert(column).second) {
- columns.push_back(atom);
- }
- }
-
- settings.SkipNullKeys.clear();
- auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
- .Table(readRangesSource.Table())
- .Columns().Add(columns).Build()
- .Settings(settings.BuildNode(ctx, source.Settings().Pos()))
- .RangesExpr(readRangesSource.RangesExpr())
- .ExplainPrompt(readRangesSource.ExplainPrompt())
- .Done();
- TDqStage replacedSettings = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
-
- TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")};
- auto replaceExpr =
- Build<TCoExtractMembers>(ctx, node.Pos())
- .Members(readRangesSource.Columns())
- .Input<TCoSkipNullMembers>()
- .Input(replaceArg)
- .Members().Add(skipNullColumns).Build()
- .Build()
- .Done();
-
- return ReplaceStageArg(replacedSettings, *tableSourceIndex, replaceArg, replaceExpr, ctx);
-}
-
TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TKqlReadTable>()) {
return node;
@@ -151,12 +85,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
const TKqlReadTable& read = node.Cast<TKqlReadTable>();
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path());
- bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery();
- useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery());
- useSource = useSource &&
- tableDesc.Metadata->Kind != EKikimrTableKind::SysView &&
- tableDesc.Metadata->Kind != EKikimrTableKind::Olap;
-
TVector<TExprBase> values;
TNodeOnNodeOwnedMap replaceMap;
@@ -225,58 +153,24 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Build()
.Done();
- if (useSource) {
- for (size_t i = 0; i < values.size(); ++i) {
- auto replace = Build<TCoNth>(ctx, read.Pos())
- .Tuple(precompute)
- .Index().Build(ToString(i))
- .Done()
- .Ptr();
-
- rangeReplaces[values[i].Raw()] = replace;
- }
- } else {
- TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")};
- programArgs.push_back(arg);
+ TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")};
+ programArgs.push_back(arg);
- for (size_t i = 0; i < values.size(); ++i) {
- auto replace = Build<TCoNth>(ctx, read.Pos())
- .Tuple(arg)
- .Index().Build(ToString(i))
- .Done()
- .Ptr();
+ for (size_t i = 0; i < values.size(); ++i) {
+ auto replace = Build<TCoNth>(ctx, read.Pos())
+ .Tuple(arg)
+ .Index().Build(ToString(i))
+ .Done()
+ .Ptr();
- rangeReplaces[values[i].Raw()] = replace;
- }
- inputs.push_back(precompute);
+ rangeReplaces[values[i].Raw()] = replace;
}
- }
-
- if (useSource) {
- inputs.push_back(
- Build<TDqSource>(ctx, read.Pos())
- .Settings<TKqpReadRangesSourceSettings>()
- .Table(read.Table())
- .Columns(read.Columns())
- .Settings(read.Settings())
- .RangesExpr(ctx.ReplaceNodes(read.Range().Ptr(), rangeReplaces))
- .Build()
- .DataSource<TCoDataSource>()
- .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
- .Build()
- .Done());
+ inputs.push_back(precompute);
}
TMaybeNode<TExprBase> phyRead;
switch (tableDesc.Metadata->Kind) {
case EKikimrTableKind::Datashard:
- if (useSource) {
- TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_source_arg")};
- programArgs.push_back(arg);
-
- phyRead = arg;
- break;
- }
case EKikimrTableKind::SysView:
phyRead = Build<TKqpReadTable>(ctx, read.Pos())
.Table(read.Table())
@@ -299,7 +193,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Body(phyRead.Cast())
.Build()
.Settings(TDqStageSettings::New()
- .SetSinglePartition(singleKey && useSource)
+ .SetSinglePartition(singleKey && UseSource(kqpCtx, tableDesc))
.BuildNode(ctx, read.Pos()))
.Done();
@@ -311,7 +205,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
.Done();
}
-
TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx, const TParentsMap& parents)
{
@@ -323,12 +216,6 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
auto ranges = read.Ranges();
auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path());
- bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery();
- useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery());
- useSource = useSource &&
- tableDesc.Metadata->Kind != EKikimrTableKind::SysView &&
- tableDesc.Metadata->Kind != EKikimrTableKind::Olap;
-
bool fullScan = TCoVoid::Match(ranges.Raw());
TVector<TExprBase> input;
@@ -416,58 +303,28 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx,
.Done();
rangesExpr = precompute;
- if (!useSource) {
- argument = Build<TCoArgument>(ctx, read.Pos())
- .Name("_kqp_pc_ranges_arg_0")
- .Done();
+ argument = Build<TCoArgument>(ctx, read.Pos())
+ .Name("_kqp_pc_ranges_arg_0")
+ .Done();
- input.push_back(precompute);
- programArgs.push_back(argument.Cast<TCoArgument>());
- }
+ input.push_back(precompute);
+ programArgs.push_back(argument.Cast<TCoArgument>());
} else {
rangesExpr = argument = read.Ranges();
}
TMaybeNode<TExprBase> phyRead;
- TMaybeNode<TExprBase> sourceArg;
- if (useSource) {
- YQL_ENSURE(rangesExpr.IsValid());
-
- input.push_back(
- Build<TDqSource>(ctx, read.Pos())
- .Settings<TKqpReadRangesSourceSettings>()
- .Table(read.Table())
- .Columns(read.Columns())
- .Settings(read.Settings())
- .RangesExpr(rangesExpr.Cast())
- .ExplainPrompt(read.ExplainPrompt())
- .Build()
- .DataSource<TCoDataSource>()
- .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
- .Build()
- .Done());
- sourceArg = Build<TCoArgument>(ctx, read.Pos())
- .Name("_kqp_pc_source_arg_0")
- .Done();
- programArgs.push_back(sourceArg.Cast<TCoArgument>());
- }
-
switch (tableDesc.Metadata->Kind) {
case EKikimrTableKind::Datashard:
case EKikimrTableKind::SysView:
- if (useSource) {
- YQL_ENSURE(sourceArg.IsValid());
- phyRead = sourceArg.Cast();
- } else {
- phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos())
- .Table(read.Table())
- .Ranges(argument.Cast())
- .Columns(read.Columns())
- .Settings(read.Settings())
- .ExplainPrompt(read.ExplainPrompt())
- .Done();
- }
+ phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos())
+ .Table(read.Table())
+ .Ranges(argument.Cast())
+ .Columns(read.Columns())
+ .Settings(read.Settings())
+ .ExplainPrompt(read.ExplainPrompt())
+ .Done();
break;
case EKikimrTableKind::Olap:
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
index a32ef61a338..e538b09367f 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp
@@ -8,164 +8,6 @@ namespace NKikimr::NKqp::NOpt {
using namespace NYql;
using namespace NYql::NNodes;
-THashSet<const TExprNode*> CollectConnections(TDqStage stage, TExprBase node) {
- THashSet<const TExprNode*> args;
- for (auto&& arg : stage.Program().Args()) {
- args.insert(arg.Raw());
- }
-
- THashSet<const TExprNode*> result;
- TNodeOnNodeOwnedMap replaceMap;
- VisitExpr(node.Ptr(),
- [&](const TExprNode::TPtr& exprPtr) -> bool {
- TExprBase expr(exprPtr);
- if (expr.Maybe<TDqConnection>()) {
- return false;
- }
- if (args.contains(exprPtr.Get())) {
- result.insert(exprPtr.Get());
- }
- return true;
- });
- return result;
-}
-
-//FIXME: simplify KIKIMR-16987
-TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
- auto stage = node.Cast<TDqStage>();
- TMaybe<size_t> tableSourceIndex;
- for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
- auto input = stage.Inputs().Item(i);
- if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
- tableSourceIndex = i;
- }
- }
- if (!tableSourceIndex) {
- return node;
- }
-
- auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
- auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
- auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
-
- if (settings.ItemsLimit) {
- return node; // already set?
- }
-
- auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex);
- TExprNode::TPtr foundTake;
- bool singleConsumer = true;
- VisitExpr(stage.Program().Body().Ptr(),
- [&](const TExprNode::TPtr& exprPtr) -> bool {
- TExprBase expr(exprPtr);
- if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) {
- return false;
- }
- if (auto take = expr.Maybe<TCoTake>()) {
- auto maybeSkip = take.Input().Maybe<TCoSkip>();
- auto input = (maybeSkip ? maybeSkip.Cast().Input() : take.Input()).Cast();
- if (input.Raw() == sourceArg.Raw()) {
- auto ptr = take.Cast().Ptr();
- if (foundTake && foundTake != ptr) {
- singleConsumer = false;
- }
- foundTake = ptr;
- }
- }
- return true;
- });
-
- if (!singleConsumer || !foundTake) {
- return node;
- }
-
- auto take = TCoTake(foundTake);
-
- auto maybeSkip = take.Input().Maybe<TCoSkip>();
- auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input();
-
- TMaybeNode<TExprBase> limitValue;
- auto maybeTakeCount = take.Count().Maybe<TCoUint64>();
- auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>();
-
- if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) {
- ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value());
-
- if (maybeSkipCount) {
- totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value());
- }
-
- limitValue = Build<TCoUint64>(ctx, node.Pos())
- .Literal<TCoAtom>()
- .Value(ToString(totalLimit)).Build()
- .Done();
- } else {
- limitValue = take.Count();
- if (maybeSkip) {
- limitValue = Build<TCoPlus>(ctx, node.Pos())
- .Left(limitValue.Cast())
- .Right(maybeSkip.Cast().Count())
- .Done();
- }
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump();
-
- if (limitValue.Maybe<TCoUint64>()) {
- settings.SetItemsLimit(limitValue.Cast().Ptr());
- } else {
- if (auto args = CollectConnections(stage, limitValue.Cast())) {
- TVector<TCoArgument> stageArgs;
- TVector<TExprBase> inputs;
- TNodeOnNodeOwnedMap replaces;
-
- size_t index = 0;
- for (auto&& arg : stage.Program().Args()) {
- if (args.contains(arg.Raw())) {
- TCoArgument replace{ctx.NewArgument(node.Pos(), TStringBuilder() << "_kqp_pc_arg_" << index)};
- inputs.push_back(stage.Inputs().Item(index));
- stageArgs.push_back(replace);
- replaces[arg.Raw()] = replace.Ptr();
- }
- index += 1;
- }
-
- limitValue = Build<TDqCnValue>(ctx, node.Pos())
- .Output()
- .Stage<TDqStage>()
- .Settings().Build()
- .Inputs().Add(inputs).Build()
- .Program<TCoLambda>()
- .Args(stageArgs)
- .Body<TCoToStream>()
- .Input<TCoJust>()
- .Input(ctx.ReplaceNodes(limitValue.Cast().Ptr(), replaces))
- .Build()
- .Build()
- .Build()
- .Build()
- .Index().Build("0")
- .Build()
- .Done();
- }
-
- settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos())
- .Input(limitValue.Cast())
- .Done().Ptr());
- }
-
- auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos())
- .Table(readRangesSource.Table())
- .Columns(readRangesSource.Columns())
- .Settings(settings.BuildNode(ctx, source.Pos()))
- .RangesExpr(readRangesSource.RangesExpr())
- .ExplainPrompt(readRangesSource.ExplainPrompt())
- .Done();
-
- return ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx);
-}
-
-
TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
if (!node.Maybe<TCoTake>()) {
return node;
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 512251ddd90..98fb1e57324 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -11,6 +11,8 @@
namespace NKikimr::NKqp::NOpt {
+NYql::NNodes::TExprBase KqpRewriteReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
+
NYql::NNodes::TExprBase KqpBuildReadTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
@@ -24,16 +26,9 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
-NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
- const TKqpOptimizeContext& kqpCtx);
-
-NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
-
NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
-NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx);
-
NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx);
@@ -50,4 +45,6 @@ NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBas
bool AllowFuseJoinInputs(NYql::NNodes::TExprBase node);
+bool UseSource(const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrTableDescription& tableDesc);
+
} // NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index d8adc43b99d..f0e11f37c20 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -16,14 +16,7 @@ using namespace NYql::NNodes;
using TTableData = std::pair<const NYql::TKikimrTableDescription*, NYql::TKqpReadTableSettings>;
-TExprBase KqpRemoveRedundantSortByPkBase(
- TExprBase node,
- TExprContext& ctx,
- const TKqpOptimizeContext& kqpCtx,
- std::function<TMaybe<TTableData>(TExprBase)> tableAccessor,
- std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput,
- bool allowSortForAllTables = false)
-{
+TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
auto maybeSort = node.Maybe<TCoSort>();
auto maybeTopBase = node.Maybe<TCoTopBase>();
@@ -83,12 +76,13 @@ TExprBase KqpRemoveRedundantSortByPkBase(
}
}
- auto tableData = tableAccessor(input);
- if (!tableData) {
+ bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
+ bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
+ if (!isReadTable && !isReadTableRanges) {
return node;
}
- auto& tableDesc = *tableData->first;
- auto settings = tableData->second;
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges));
+ auto settings = GetReadTableSettings(input, isReadTableRanges);
auto checkKey = [keySelector, &tableDesc, &passthroughFields] (TExprBase key, ui32 index) {
if (!key.Maybe<TCoMember>()) {
@@ -129,7 +123,7 @@ TExprBase KqpRemoveRedundantSortByPkBase(
bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap;
if (direction == SortDirectionReverse) {
- if (!allowSortForAllTables && !olapTable && kqpCtx.IsScanQuery()) {
+ if (!UseSource(kqpCtx, tableDesc) && !olapTable && kqpCtx.IsScanQuery()) {
return node;
}
@@ -140,11 +134,11 @@ TExprBase KqpRemoveRedundantSortByPkBase(
settings.SetReverse();
settings.SetSorted();
- input = rebuildInput(input, settings);
+ input = BuildReadNode(input.Pos(), ctx, input, settings);
} else if (direction == SortDirectionForward) {
- if (olapTable || allowSortForAllTables) {
+ if (olapTable || UseSource(kqpCtx, tableDesc)) {
settings.SetSorted();
- input = rebuildInput(input, settings);
+ input = BuildReadNode(input.Pos(), ctx, input, settings);
}
}
@@ -165,103 +159,5 @@ TExprBase KqpRemoveRedundantSortByPkBase(
}
}
-TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
- return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx,
- [&](TExprBase input) -> TMaybe<TTableData> {
- bool isReadTable = input.Maybe<TKqpReadTable>().IsValid();
- bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ;
- if (!isReadTable && !isReadTableRanges) {
- return Nothing();
- }
- auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges));
- auto settings = GetReadTableSettings(input, isReadTableRanges);
- return TTableData{&tableDesc, settings};
- },
- [&](TExprBase input, NYql::TKqpReadTableSettings settings) {
- return BuildReadNode(input.Pos(), ctx, input, settings);
- });
-}
-
-//FIXME: simplify KIKIMR-16987
-NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
- NYql::NNodes::TExprBase node, NYql::TExprContext& exprCtx, const TKqpOptimizeContext& kqpCtx)
-{
- auto stage = node.Cast<TDqStage>();
- TMaybe<size_t> tableSourceIndex;
- for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
- auto input = stage.Inputs().Item(i);
- if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) {
- tableSourceIndex = i;
- }
- }
- if (!tableSourceIndex) {
- return node;
- }
-
- auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>();
- auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>();
- auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings());
-
- auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path());
-
- TVector<NYql::TKqpReadTableSettings> newSettings;
- NYql::TNodeOnNodeOwnedMap bodyReplaces;
- VisitExpr(stage.Program().Body().Ptr(),
- [&](const TExprNode::TPtr& exprPtr) -> bool {
- TExprBase expr(exprPtr);
- if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) {
- return false;
- }
- if (TCoSort::Match(expr.Raw()) || TCoTopBase::Match(expr.Raw())) {
- auto newExpr = KqpRemoveRedundantSortByPkBase(expr, exprCtx, kqpCtx,
- [&](TExprBase node) -> TMaybe<TTableData> {
- if (node.Ptr() != node.Ptr()) {
- return Nothing();
- }
- return TTableData{&tableDesc, settings};
- },
- [&](TExprBase input, NYql::TKqpReadTableSettings settings) {
- newSettings.push_back(settings);
- return input;
- },
- /* allowSortForAllTables */ true);
- if (newExpr.Ptr() != expr.Ptr()) {
- bodyReplaces[expr.Raw()] = newExpr.Ptr();
- }
- }
- return true;
- });
-
- if (newSettings) {
- for (size_t i = 1; i < newSettings.size(); ++i) {
- if (newSettings[0] != newSettings[i]) {
- return node;
- }
- }
-
- if (settings != newSettings[0]) {
- auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos())
- .Table(readRangesSource.Table())
- .Columns(readRangesSource.Columns())
- .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos()))
- .RangesExpr(readRangesSource.RangesExpr())
- .ExplainPrompt(readRangesSource.ExplainPrompt())
- .Done();
- stage = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSource, exprCtx);
- }
- }
-
- if (bodyReplaces.empty()) {
- return stage;
- }
-
- return Build<TDqStage>(exprCtx, stage.Pos())
- .Inputs(stage.Inputs())
- .Outputs(stage.Outputs())
- .Settings(stage.Settings())
- .Program(TCoLambda(exprCtx.ReplaceNodes(stage.Program().Ptr(), bodyReplaces)))
- .Done();
-}
-
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
new file mode 100644
index 00000000000..1e9b774c0a9
--- /dev/null
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp
@@ -0,0 +1,171 @@
+#include "kqp_opt_phy_rules.h"
+
+#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/kqp/opt/kqp_opt_impl.h>
+#include <ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h>
+#include <ydb/core/tx/schemeshard/schemeshard_utils.h>
+
+#include <ydb/public/lib/scheme_types/scheme_type_id.h>
+
+#include <ydb/library/yql/dq/opt/dq_opt.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+
+namespace NKikimr::NKqp::NOpt {
+
+using namespace NYql;
+using namespace NYql::NDq;
+using namespace NYql::NNodes;
+
+
+bool UseSource(const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrTableDescription& tableDesc) {
+ bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery();
+ useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery());
+ useSource = useSource &&
+ tableDesc.Metadata->Kind != EKikimrTableKind::SysView &&
+ tableDesc.Metadata->Kind != EKikimrTableKind::Olap;
+
+ return useSource;
+}
+
+TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
+ auto stage = node.Cast<TDqStage>();
+ struct TMatchedRead {
+ TExprBase Expr;
+ TKqpTable Table;
+ TCoAtomList Columns;
+ TCoNameValueTupleList Settings;
+ TExprBase RangeExpr;
+ TMaybeNode<TCoNameValueTupleList> ExplainPrompt = {};
+ };
+ TMaybe<TMatchedRead> matched;
+
+ TMaybeNode<TKqpReadTable> mayberead;
+ VisitExpr(stage.Program().Body().Ptr(), [&](const TExprNode::TPtr& node) {
+ TExprBase expr(node);
+ if (auto cast = expr.Maybe<TKqpReadTable>()) {
+ Y_ENSURE(!matched || matched->Expr.Raw() == node.Get());
+ auto read = cast.Cast();
+ matched = TMatchedRead {
+ .Expr = read,
+ .Table = read.Table(),
+ .Columns = read.Columns(),
+ .Settings = read.Settings(),
+ .RangeExpr = read.Range()
+ };
+ }
+
+ if (auto cast = expr.Maybe<TKqpReadTableRanges>()) {
+ Y_ENSURE(!matched || matched->Expr.Raw() == node.Get());
+ auto read = cast.Cast();
+ matched = TMatchedRead {
+ .Expr = read,
+ .Table = read.Table(),
+ .Columns = read.Columns(),
+ .Settings = read.Settings(),
+ .RangeExpr = read.Ranges(),
+ .ExplainPrompt = read.ExplainPrompt()
+ };
+ }
+ return true;
+ });
+
+ if (!matched) {
+ return node;
+ }
+
+ auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, matched->Table.Path());
+ if (!UseSource(kqpCtx, tableDesc)) {
+ return node;
+ }
+
+ auto settings = TKqpReadTableSettings::Parse(matched->Settings);
+ auto selectColumns = matched->Columns;
+ TVector<TCoAtom> skipNullColumns;
+ if (settings.SkipNullKeys) {
+ THashSet<TString> seenColumns;
+ TVector<TCoAtom> columns;
+
+ for (size_t i = 0; i < matched->Columns.Size(); ++i) {
+ auto atom = matched->Columns.Item(i);
+ auto column = atom.StringValue();
+ if (seenColumns.insert(column).second) {
+ columns.push_back(atom);
+ }
+ }
+ for (auto& column : settings.SkipNullKeys) {
+ TCoAtom atom(ctx.NewAtom(matched->Settings.Pos(), column));
+ skipNullColumns.push_back(atom);
+ if (seenColumns.insert(column).second) {
+ columns.push_back(atom);
+ }
+ }
+
+ matched->Columns = Build<TCoAtomList>(ctx, matched->Columns.Pos()).Add(columns).Done();
+
+ settings.SkipNullKeys.clear();
+ matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
+ }
+
+ TVector<TExprBase> inputs;
+ TVector<TCoArgument> args;
+ TNodeOnNodeOwnedMap argReplaces;
+ TNodeOnNodeOwnedMap sourceReplaces;
+
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ inputs.push_back(stage.Inputs().Item(i));
+
+ TCoArgument newArg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_pc_arg_" << i)};
+ args.push_back(newArg);
+
+ TCoArgument arg = stage.Program().Args().Arg(i);
+
+ argReplaces[arg.Raw()] = newArg.Ptr();
+ sourceReplaces[arg.Raw()] = stage.Inputs().Item(i).Ptr();
+ }
+
+ TCoArgument arg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_source_arg")};
+ args.insert(args.begin(), arg);
+ if (skipNullColumns) {
+ argReplaces[matched->Expr.Raw()] =
+ Build<TCoExtractMembers>(ctx, node.Pos())
+ .Members(selectColumns)
+ .Input<TCoSkipNullMembers>()
+ .Input<TCoToFlow>().Input(arg).Build()
+ .Members().Add(skipNullColumns).Build()
+ .Build()
+ .Done().Ptr();
+ } else {
+ argReplaces[matched->Expr.Raw()] =
+ Build<TCoToFlow>(ctx, matched->Expr.Pos())
+ .Input(arg)
+ .Done()
+ .Ptr();
+ }
+
+ auto source =
+ Build<TDqSource>(ctx, matched->Expr.Pos())
+ .Settings<TKqpReadRangesSourceSettings>()
+ .Table(matched->Table)
+ .Columns(matched->Columns)
+ .Settings(matched->Settings)
+ .RangesExpr(matched->RangeExpr)
+ .ExplainPrompt(matched->ExplainPrompt)
+ .Build()
+ .DataSource<TCoDataSource>()
+ .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build()
+ .Build()
+ .Done();
+ inputs.insert(inputs.begin(), TExprBase(ctx.ReplaceNodes(source.Ptr(), sourceReplaces)));
+
+ return Build<TDqStage>(ctx, stage.Pos())
+ .Inputs().Add(inputs).Build()
+ .Outputs(stage.Outputs())
+ .Settings(stage.Settings())
+ .Program()
+ .Args(args)
+ .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argReplaces))
+ .Build()
+ .Done();
+}
+
+} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index b80d34f5a27..bd231ab15c9 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -836,14 +836,9 @@ public:
TString lastKey = "(empty)";
if (!token.GetLastProcessedKey().empty()) {
TStringBuilder builder;
- TVector<NScheme::TTypeInfo> types;
- for (auto& column : Settings.GetColumns()) {
- types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType()));
- }
-
TSerializedCellVec row(token.GetLastProcessedKey());
- lastKey = DebugPrintPoint(types, row.GetCells(), *AppData()->TypeRegistry);
+ lastKey = DebugPrintPoint(KeyColumnTypes, row.GetCells(), *AppData()->TypeRegistry);
}
return TStringBuilder() << "first request = " << token.GetFirstUnprocessedQuery() << " lastkey = " << lastKey;
}
diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
index c763040b3de..a669187a797 100644
--- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp
@@ -81,6 +81,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT(ValidatePlanNodeIds(plan));
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
+ if (!join.IsDefined()) {
+ join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
+ }
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
@@ -108,6 +111,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
UNIT_ASSERT(ValidatePlanNodeIds(plan));
auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter");
+ if (!join.IsDefined()) {
+ join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan");
+ }
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());
@@ -188,6 +194,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
auto res = CollectStreamResult(it);
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
UNIT_ASSERT(res.PlanJson);
+ Cerr << *res.PlanJson;
NJson::TJsonValue plan;
NJson::ReadJsonTree(*res.PlanJson, &plan, true);
@@ -198,6 +205,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
"Node Type",
"Aggregate-InnerJoin (MapJoin)-Filter"
);
+ if (!join.IsDefined()) {
+ join = FindPlanNodeByKv(
+ plan,
+ "Node Type",
+ "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"
+ );
+ }
UNIT_ASSERT(join.IsDefined());
auto left = FindPlanNodeByKv(join, "Table", "EightShard");
UNIT_ASSERT(left.IsDefined());