diff options
author | ssmike <ssmike@ydb.tech> | 2023-04-17 14:35:22 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-04-17 14:35:22 +0300 |
commit | a06be2f06b558b4c2f4038edaf8b12be4179a423 (patch) | |
tree | d4d466d8339e89f6c59f34d1380736573158f06e | |
parent | 420830e04a7935fb36b3be307000a2110980f5a9 (diff) | |
download | ydb-a06be2f06b558b4c2f4038edaf8b12be4179a423.tar.gz |
Rewrite physical reads to sources on last step
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp | 189 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp | 158 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 124 | ||||
-rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp | 171 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_explain_ut.cpp | 14 |
13 files changed, 236 insertions, 472 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 0c53539739d..c78eb86c075 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -703,7 +703,10 @@ protected: auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasReadRangesSource()); - YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); + YQL_ENSURE(stage.GetSources(0).GetInputIndex() == 0 && stage.SourcesSize() == 1); + for (auto& input : stage.inputs()) { + YQL_ENSURE(input.HasBroadcast()); + } auto& source = stage.GetSources(0).GetReadRangesSource(); diff --git a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt index 63bcbe9b24e..852ad9e1d57 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.darwin-x86_64.txt @@ -28,6 +28,7 @@ target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt index 5134021e615..7204a81d075 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-aarch64.txt @@ -29,6 +29,7 @@ target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt index 5134021e615..7204a81d075 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.linux-x86_64.txt @@ -29,6 +29,7 @@ target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp diff --git a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt index 63bcbe9b24e..852ad9e1d57 100644 --- a/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/opt/physical/CMakeLists.windows-x86_64.txt @@ -28,6 +28,7 @@ target_sources(kqp-opt-physical PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_precompute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy_stage_float_up.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index cb093f1c7dd..2cc8d54b9d0 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -34,9 +34,7 @@ public: AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage)); AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage)); AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages)); - AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); }, - HNDL(RemoveRedundantSortByPk)); - AddHandler(0, &TDqStage::Match, HNDL(RemoveRedundantSortByPkOverSource)); + AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); AddHandler(0, &TCoFlatMap::Match, HNDL(PushOlapFilter)); AddHandler(0, &TCoAggregateCombine::Match, HNDL(PushAggregateCombineToStage)); @@ -80,7 +78,6 @@ public: AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>)); AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>)); AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>)); - AddHandler(0, &TDqStage::Match, HNDL(ApplyLimitToReadTableSource)); AddHandler(0, &TDqCnHashShuffle::Match, HNDL(BuildHashShuffleByKeyStage)); @@ -116,7 +113,7 @@ public: AddHandler(1, &TCoTake::Match, HNDL(PropagatePrecomuteTake<true>)); AddHandler(1, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<true>)); - AddHandler(2, &TDqStage::Match, HNDL(ExpandNullMembersForReadTableSource)); + AddHandler(2, &TDqStage::Match, HNDL(RewriteKqpReadTable)); #undef HNDL SetGlobal(1u); @@ -148,27 +145,15 @@ protected: return output; } - TMaybeNode<TExprBase> RemoveRedundantSortByPkOverSource(TExprBase node, TExprContext& ctx) { - TExprBase output = KqpRemoveRedundantSortByPkOverSource(node, ctx, KqpCtx); - DumpAppliedRule("RemoveRedundantSortByPkOverSource", node.Ptr(), output.Ptr(), ctx); - return output; - } - TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) { TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx); DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx); return output; } - TMaybeNode<TExprBase> ExpandNullMembersForReadTableSource(TExprBase node, TExprContext& ctx) { - TExprBase output = ExpandSkipNullMembersForReadTableSource(node, ctx, KqpCtx); - DumpAppliedRule("ExpandSkipNullMembersForReadTableSource", node.Ptr(), output.Ptr(), ctx); - return output; - } - - TMaybeNode<TExprBase> ApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx) { - TExprBase output = KqpApplyLimitToReadTableSource(node, ctx, KqpCtx); - DumpAppliedRule("ApplyLimitToReadTableSource", node.Ptr(), output.Ptr(), ctx); + TMaybeNode<TExprBase> RewriteKqpReadTable(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpRewriteReadTable(node, ctx, KqpCtx); + DumpAppliedRule("RewriteKqpReadTable", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 22cd6719023..e84a9d6aeda 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -78,72 +78,6 @@ TMaybeNode<TDqPhyPrecompute> BuildLookupKeysPrecompute(const TExprBase& input, T .Done(); } -// ReadRangesSource can't deal with skipnullkeys, so we should expand it to (ExtractMembers (SkipNullKeys)) -//FIXME: simplify KIKIMR-16987 -NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext&) { - auto stage = node.Cast<TDqStage>(); - TMaybe<size_t> tableSourceIndex; - for (size_t i = 0; i < stage.Inputs().Size(); ++i) { - auto input = stage.Inputs().Item(i); - if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { - tableSourceIndex = i; - } - } - if (!tableSourceIndex) { - return node; - } - - auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); - auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); - auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); - - if (settings.SkipNullKeys.empty()) { - return node; - } - - auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); - - THashSet<TString> seenColumns; - TVector<TCoAtom> columns; - TVector<TCoAtom> skipNullColumns; - for (size_t i = 0; i < readRangesSource.Columns().Size(); ++i) { - auto atom = readRangesSource.Columns().Item(i); - auto column = atom.StringValue(); - if (seenColumns.insert(column).second) { - columns.push_back(atom); - } - } - for (auto& column : settings.SkipNullKeys) { - TCoAtom atom(ctx.NewAtom(readRangesSource.Settings().Pos(), column)); - skipNullColumns.push_back(atom); - if (seenColumns.insert(column).second) { - columns.push_back(atom); - } - } - - settings.SkipNullKeys.clear(); - auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) - .Table(readRangesSource.Table()) - .Columns().Add(columns).Build() - .Settings(settings.BuildNode(ctx, source.Settings().Pos())) - .RangesExpr(readRangesSource.RangesExpr()) - .ExplainPrompt(readRangesSource.ExplainPrompt()) - .Done(); - TDqStage replacedSettings = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx); - - TCoArgument replaceArg{ctx.NewArgument(sourceArg.Pos(), TStringBuilder() << "_kqp_source_arg_0")}; - auto replaceExpr = - Build<TCoExtractMembers>(ctx, node.Pos()) - .Members(readRangesSource.Columns()) - .Input<TCoSkipNullMembers>() - .Input(replaceArg) - .Members().Add(skipNullColumns).Build() - .Build() - .Done(); - - return ReplaceStageArg(replacedSettings, *tableSourceIndex, replaceArg, replaceExpr, ctx); -} - TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TKqlReadTable>()) { return node; @@ -151,12 +85,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp const TKqlReadTable& read = node.Cast<TKqlReadTable>(); auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path()); - bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery(); - useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery()); - useSource = useSource && - tableDesc.Metadata->Kind != EKikimrTableKind::SysView && - tableDesc.Metadata->Kind != EKikimrTableKind::Olap; - TVector<TExprBase> values; TNodeOnNodeOwnedMap replaceMap; @@ -225,58 +153,24 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Build() .Done(); - if (useSource) { - for (size_t i = 0; i < values.size(); ++i) { - auto replace = Build<TCoNth>(ctx, read.Pos()) - .Tuple(precompute) - .Index().Build(ToString(i)) - .Done() - .Ptr(); - - rangeReplaces[values[i].Raw()] = replace; - } - } else { - TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")}; - programArgs.push_back(arg); + TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_pc_arg_0")}; + programArgs.push_back(arg); - for (size_t i = 0; i < values.size(); ++i) { - auto replace = Build<TCoNth>(ctx, read.Pos()) - .Tuple(arg) - .Index().Build(ToString(i)) - .Done() - .Ptr(); + for (size_t i = 0; i < values.size(); ++i) { + auto replace = Build<TCoNth>(ctx, read.Pos()) + .Tuple(arg) + .Index().Build(ToString(i)) + .Done() + .Ptr(); - rangeReplaces[values[i].Raw()] = replace; - } - inputs.push_back(precompute); + rangeReplaces[values[i].Raw()] = replace; } - } - - if (useSource) { - inputs.push_back( - Build<TDqSource>(ctx, read.Pos()) - .Settings<TKqpReadRangesSourceSettings>() - .Table(read.Table()) - .Columns(read.Columns()) - .Settings(read.Settings()) - .RangesExpr(ctx.ReplaceNodes(read.Range().Ptr(), rangeReplaces)) - .Build() - .DataSource<TCoDataSource>() - .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() - .Build() - .Done()); + inputs.push_back(precompute); } TMaybeNode<TExprBase> phyRead; switch (tableDesc.Metadata->Kind) { case EKikimrTableKind::Datashard: - if (useSource) { - TCoArgument arg{ctx.NewArgument(read.Pos(), TStringBuilder() << "_kqp_source_arg")}; - programArgs.push_back(arg); - - phyRead = arg; - break; - } case EKikimrTableKind::SysView: phyRead = Build<TKqpReadTable>(ctx, read.Pos()) .Table(read.Table()) @@ -299,7 +193,7 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Body(phyRead.Cast()) .Build() .Settings(TDqStageSettings::New() - .SetSinglePartition(singleKey && useSource) + .SetSinglePartition(singleKey && UseSource(kqpCtx, tableDesc)) .BuildNode(ctx, read.Pos())) .Done(); @@ -311,7 +205,6 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp .Done(); } - TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TParentsMap& parents) { @@ -323,12 +216,6 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, auto ranges = read.Ranges(); auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, read.Table().Path()); - bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery(); - useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery()); - useSource = useSource && - tableDesc.Metadata->Kind != EKikimrTableKind::SysView && - tableDesc.Metadata->Kind != EKikimrTableKind::Olap; - bool fullScan = TCoVoid::Match(ranges.Raw()); TVector<TExprBase> input; @@ -416,58 +303,28 @@ TExprBase KqpBuildReadTableRangesStage(TExprBase node, TExprContext& ctx, .Done(); rangesExpr = precompute; - if (!useSource) { - argument = Build<TCoArgument>(ctx, read.Pos()) - .Name("_kqp_pc_ranges_arg_0") - .Done(); + argument = Build<TCoArgument>(ctx, read.Pos()) + .Name("_kqp_pc_ranges_arg_0") + .Done(); - input.push_back(precompute); - programArgs.push_back(argument.Cast<TCoArgument>()); - } + input.push_back(precompute); + programArgs.push_back(argument.Cast<TCoArgument>()); } else { rangesExpr = argument = read.Ranges(); } TMaybeNode<TExprBase> phyRead; - TMaybeNode<TExprBase> sourceArg; - if (useSource) { - YQL_ENSURE(rangesExpr.IsValid()); - - input.push_back( - Build<TDqSource>(ctx, read.Pos()) - .Settings<TKqpReadRangesSourceSettings>() - .Table(read.Table()) - .Columns(read.Columns()) - .Settings(read.Settings()) - .RangesExpr(rangesExpr.Cast()) - .ExplainPrompt(read.ExplainPrompt()) - .Build() - .DataSource<TCoDataSource>() - .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() - .Build() - .Done()); - sourceArg = Build<TCoArgument>(ctx, read.Pos()) - .Name("_kqp_pc_source_arg_0") - .Done(); - programArgs.push_back(sourceArg.Cast<TCoArgument>()); - } - switch (tableDesc.Metadata->Kind) { case EKikimrTableKind::Datashard: case EKikimrTableKind::SysView: - if (useSource) { - YQL_ENSURE(sourceArg.IsValid()); - phyRead = sourceArg.Cast(); - } else { - phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos()) - .Table(read.Table()) - .Ranges(argument.Cast()) - .Columns(read.Columns()) - .Settings(read.Settings()) - .ExplainPrompt(read.ExplainPrompt()) - .Done(); - } + phyRead = Build<TKqpReadTableRanges>(ctx, read.Pos()) + .Table(read.Table()) + .Ranges(argument.Cast()) + .Columns(read.Columns()) + .Settings(read.Settings()) + .ExplainPrompt(read.ExplainPrompt()) + .Done(); break; case EKikimrTableKind::Olap: diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp index a32ef61a338..e538b09367f 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_limit.cpp @@ -8,164 +8,6 @@ namespace NKikimr::NKqp::NOpt { using namespace NYql; using namespace NYql::NNodes; -THashSet<const TExprNode*> CollectConnections(TDqStage stage, TExprBase node) { - THashSet<const TExprNode*> args; - for (auto&& arg : stage.Program().Args()) { - args.insert(arg.Raw()); - } - - THashSet<const TExprNode*> result; - TNodeOnNodeOwnedMap replaceMap; - VisitExpr(node.Ptr(), - [&](const TExprNode::TPtr& exprPtr) -> bool { - TExprBase expr(exprPtr); - if (expr.Maybe<TDqConnection>()) { - return false; - } - if (args.contains(exprPtr.Get())) { - result.insert(exprPtr.Get()); - } - return true; - }); - return result; -} - -//FIXME: simplify KIKIMR-16987 -TExprBase KqpApplyLimitToReadTableSource(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) { - auto stage = node.Cast<TDqStage>(); - TMaybe<size_t> tableSourceIndex; - for (size_t i = 0; i < stage.Inputs().Size(); ++i) { - auto input = stage.Inputs().Item(i); - if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { - tableSourceIndex = i; - } - } - if (!tableSourceIndex) { - return node; - } - - auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); - auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); - auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); - - if (settings.ItemsLimit) { - return node; // already set? - } - - auto sourceArg = stage.Program().Args().Arg(*tableSourceIndex); - TExprNode::TPtr foundTake; - bool singleConsumer = true; - VisitExpr(stage.Program().Body().Ptr(), - [&](const TExprNode::TPtr& exprPtr) -> bool { - TExprBase expr(exprPtr); - if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) { - return false; - } - if (auto take = expr.Maybe<TCoTake>()) { - auto maybeSkip = take.Input().Maybe<TCoSkip>(); - auto input = (maybeSkip ? maybeSkip.Cast().Input() : take.Input()).Cast(); - if (input.Raw() == sourceArg.Raw()) { - auto ptr = take.Cast().Ptr(); - if (foundTake && foundTake != ptr) { - singleConsumer = false; - } - foundTake = ptr; - } - } - return true; - }); - - if (!singleConsumer || !foundTake) { - return node; - } - - auto take = TCoTake(foundTake); - - auto maybeSkip = take.Input().Maybe<TCoSkip>(); - auto input = maybeSkip ? maybeSkip.Cast().Input() : take.Input(); - - TMaybeNode<TExprBase> limitValue; - auto maybeTakeCount = take.Count().Maybe<TCoUint64>(); - auto maybeSkipCount = maybeSkip.Count().Maybe<TCoUint64>(); - - if (maybeTakeCount && (!maybeSkip || maybeSkipCount)) { - ui64 totalLimit = FromString<ui64>(maybeTakeCount.Cast().Literal().Value()); - - if (maybeSkipCount) { - totalLimit += FromString<ui64>(maybeSkipCount.Cast().Literal().Value()); - } - - limitValue = Build<TCoUint64>(ctx, node.Pos()) - .Literal<TCoAtom>() - .Value(ToString(totalLimit)).Build() - .Done(); - } else { - limitValue = take.Count(); - if (maybeSkip) { - limitValue = Build<TCoPlus>(ctx, node.Pos()) - .Left(limitValue.Cast()) - .Right(maybeSkip.Cast().Count()) - .Done(); - } - } - - YQL_CLOG(TRACE, ProviderKqp) << "-- set limit items value to " << limitValue.Cast().Ref().Dump(); - - if (limitValue.Maybe<TCoUint64>()) { - settings.SetItemsLimit(limitValue.Cast().Ptr()); - } else { - if (auto args = CollectConnections(stage, limitValue.Cast())) { - TVector<TCoArgument> stageArgs; - TVector<TExprBase> inputs; - TNodeOnNodeOwnedMap replaces; - - size_t index = 0; - for (auto&& arg : stage.Program().Args()) { - if (args.contains(arg.Raw())) { - TCoArgument replace{ctx.NewArgument(node.Pos(), TStringBuilder() << "_kqp_pc_arg_" << index)}; - inputs.push_back(stage.Inputs().Item(index)); - stageArgs.push_back(replace); - replaces[arg.Raw()] = replace.Ptr(); - } - index += 1; - } - - limitValue = Build<TDqCnValue>(ctx, node.Pos()) - .Output() - .Stage<TDqStage>() - .Settings().Build() - .Inputs().Add(inputs).Build() - .Program<TCoLambda>() - .Args(stageArgs) - .Body<TCoToStream>() - .Input<TCoJust>() - .Input(ctx.ReplaceNodes(limitValue.Cast().Ptr(), replaces)) - .Build() - .Build() - .Build() - .Build() - .Index().Build("0") - .Build() - .Done(); - } - - settings.SetItemsLimit(Build<TDqPrecompute>(ctx, node.Pos()) - .Input(limitValue.Cast()) - .Done().Ptr()); - } - - auto newSettings = Build<TKqpReadRangesSourceSettings>(ctx, source.Pos()) - .Table(readRangesSource.Table()) - .Columns(readRangesSource.Columns()) - .Settings(settings.BuildNode(ctx, source.Pos())) - .RangesExpr(readRangesSource.RangesExpr()) - .ExplainPrompt(readRangesSource.ExplainPrompt()) - .Done(); - - return ReplaceTableSourceSettings(stage, *tableSourceIndex, newSettings, ctx); -} - - TExprBase KqpApplyLimitToReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { if (!node.Maybe<TCoTake>()) { return node; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 512251ddd90..98fb1e57324 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -11,6 +11,8 @@ namespace NKikimr::NKqp::NOpt { +NYql::NNodes::TExprBase KqpRewriteReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); + NYql::NNodes::TExprBase KqpBuildReadTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); @@ -24,16 +26,9 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); -NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, - const TKqpOptimizeContext& kqpCtx); - -NYql::NNodes::TExprBase KqpApplyLimitToReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); - NYql::NNodes::TExprBase KqpApplyLimitToReadTable(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); -NYql::NNodes::TExprBase ExpandSkipNullMembersForReadTableSource(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); - NYql::NNodes::TExprBase KqpPushOlapFilter(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, NYql::TTypeAnnotationContext& typesCtx); @@ -50,4 +45,6 @@ NYql::NNodes::TExprBase KqpPropagatePrecomuteScalarRowset(NYql::NNodes::TExprBas bool AllowFuseJoinInputs(NYql::NNodes::TExprBase node); +bool UseSource(const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrTableDescription& tableDesc); + } // NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index d8adc43b99d..f0e11f37c20 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -16,14 +16,7 @@ using namespace NYql::NNodes; using TTableData = std::pair<const NYql::TKikimrTableDescription*, NYql::TKqpReadTableSettings>; -TExprBase KqpRemoveRedundantSortByPkBase( - TExprBase node, - TExprContext& ctx, - const TKqpOptimizeContext& kqpCtx, - std::function<TMaybe<TTableData>(TExprBase)> tableAccessor, - std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput, - bool allowSortForAllTables = false) -{ +TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { auto maybeSort = node.Maybe<TCoSort>(); auto maybeTopBase = node.Maybe<TCoTopBase>(); @@ -83,12 +76,13 @@ TExprBase KqpRemoveRedundantSortByPkBase( } } - auto tableData = tableAccessor(input); - if (!tableData) { + bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); + bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ; + if (!isReadTable && !isReadTableRanges) { return node; } - auto& tableDesc = *tableData->first; - auto settings = tableData->second; + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges)); + auto settings = GetReadTableSettings(input, isReadTableRanges); auto checkKey = [keySelector, &tableDesc, &passthroughFields] (TExprBase key, ui32 index) { if (!key.Maybe<TCoMember>()) { @@ -129,7 +123,7 @@ TExprBase KqpRemoveRedundantSortByPkBase( bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap; if (direction == SortDirectionReverse) { - if (!allowSortForAllTables && !olapTable && kqpCtx.IsScanQuery()) { + if (!UseSource(kqpCtx, tableDesc) && !olapTable && kqpCtx.IsScanQuery()) { return node; } @@ -140,11 +134,11 @@ TExprBase KqpRemoveRedundantSortByPkBase( settings.SetReverse(); settings.SetSorted(); - input = rebuildInput(input, settings); + input = BuildReadNode(input.Pos(), ctx, input, settings); } else if (direction == SortDirectionForward) { - if (olapTable || allowSortForAllTables) { + if (olapTable || UseSource(kqpCtx, tableDesc)) { settings.SetSorted(); - input = rebuildInput(input, settings); + input = BuildReadNode(input.Pos(), ctx, input, settings); } } @@ -165,103 +159,5 @@ TExprBase KqpRemoveRedundantSortByPkBase( } } -TExprBase KqpRemoveRedundantSortByPk(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - return KqpRemoveRedundantSortByPkBase(node, ctx, kqpCtx, - [&](TExprBase input) -> TMaybe<TTableData> { - bool isReadTable = input.Maybe<TKqpReadTable>().IsValid(); - bool isReadTableRanges = input.Maybe<TKqpReadTableRanges>().IsValid() || input.Maybe<TKqpReadOlapTableRanges>().IsValid() ; - if (!isReadTable && !isReadTableRanges) { - return Nothing(); - } - auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, GetReadTablePath(input, isReadTableRanges)); - auto settings = GetReadTableSettings(input, isReadTableRanges); - return TTableData{&tableDesc, settings}; - }, - [&](TExprBase input, NYql::TKqpReadTableSettings settings) { - return BuildReadNode(input.Pos(), ctx, input, settings); - }); -} - -//FIXME: simplify KIKIMR-16987 -NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( - NYql::NNodes::TExprBase node, NYql::TExprContext& exprCtx, const TKqpOptimizeContext& kqpCtx) -{ - auto stage = node.Cast<TDqStage>(); - TMaybe<size_t> tableSourceIndex; - for (size_t i = 0; i < stage.Inputs().Size(); ++i) { - auto input = stage.Inputs().Item(i); - if (input.Maybe<TDqSource>() && input.Cast<TDqSource>().Settings().Maybe<TKqpReadRangesSourceSettings>()) { - tableSourceIndex = i; - } - } - if (!tableSourceIndex) { - return node; - } - - auto source = stage.Inputs().Item(*tableSourceIndex).Cast<TDqSource>(); - auto readRangesSource = source.Settings().Cast<TKqpReadRangesSourceSettings>(); - auto settings = TKqpReadTableSettings::Parse(readRangesSource.Settings()); - - auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, readRangesSource.Table().Path()); - - TVector<NYql::TKqpReadTableSettings> newSettings; - NYql::TNodeOnNodeOwnedMap bodyReplaces; - VisitExpr(stage.Program().Body().Ptr(), - [&](const TExprNode::TPtr& exprPtr) -> bool { - TExprBase expr(exprPtr); - if (expr.Maybe<TDqConnection>() || expr.Maybe<TDqPrecompute>() || expr.Maybe<TDqPhyPrecompute>()) { - return false; - } - if (TCoSort::Match(expr.Raw()) || TCoTopBase::Match(expr.Raw())) { - auto newExpr = KqpRemoveRedundantSortByPkBase(expr, exprCtx, kqpCtx, - [&](TExprBase node) -> TMaybe<TTableData> { - if (node.Ptr() != node.Ptr()) { - return Nothing(); - } - return TTableData{&tableDesc, settings}; - }, - [&](TExprBase input, NYql::TKqpReadTableSettings settings) { - newSettings.push_back(settings); - return input; - }, - /* allowSortForAllTables */ true); - if (newExpr.Ptr() != expr.Ptr()) { - bodyReplaces[expr.Raw()] = newExpr.Ptr(); - } - } - return true; - }); - - if (newSettings) { - for (size_t i = 1; i < newSettings.size(); ++i) { - if (newSettings[0] != newSettings[i]) { - return node; - } - } - - if (settings != newSettings[0]) { - auto newSource = Build<TKqpReadRangesSourceSettings>(exprCtx, source.Pos()) - .Table(readRangesSource.Table()) - .Columns(readRangesSource.Columns()) - .Settings(newSettings[0].BuildNode(exprCtx, source.Settings().Pos())) - .RangesExpr(readRangesSource.RangesExpr()) - .ExplainPrompt(readRangesSource.ExplainPrompt()) - .Done(); - stage = ReplaceTableSourceSettings(stage, *tableSourceIndex, newSource, exprCtx); - } - } - - if (bodyReplaces.empty()) { - return stage; - } - - return Build<TDqStage>(exprCtx, stage.Pos()) - .Inputs(stage.Inputs()) - .Outputs(stage.Outputs()) - .Settings(stage.Settings()) - .Program(TCoLambda(exprCtx.ReplaceNodes(stage.Program().Ptr(), bodyReplaces))) - .Done(); -} - } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp new file mode 100644 index 00000000000..1e9b774c0a9 --- /dev/null +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp @@ -0,0 +1,171 @@ +#include "kqp_opt_phy_rules.h" + +#include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/opt/kqp_opt_impl.h> +#include <ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h> +#include <ydb/core/tx/schemeshard/schemeshard_utils.h> + +#include <ydb/public/lib/scheme_types/scheme_type_id.h> + +#include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/core/yql_opt_utils.h> + +namespace NKikimr::NKqp::NOpt { + +using namespace NYql; +using namespace NYql::NDq; +using namespace NYql::NNodes; + + +bool UseSource(const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrTableDescription& tableDesc) { + bool useSource = kqpCtx.Config->EnableKqpScanQuerySourceRead && kqpCtx.IsScanQuery(); + useSource = useSource || (kqpCtx.Config->EnableKqpDataQuerySourceRead && kqpCtx.IsDataQuery()); + useSource = useSource && + tableDesc.Metadata->Kind != EKikimrTableKind::SysView && + tableDesc.Metadata->Kind != EKikimrTableKind::Olap; + + return useSource; +} + +TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { + auto stage = node.Cast<TDqStage>(); + struct TMatchedRead { + TExprBase Expr; + TKqpTable Table; + TCoAtomList Columns; + TCoNameValueTupleList Settings; + TExprBase RangeExpr; + TMaybeNode<TCoNameValueTupleList> ExplainPrompt = {}; + }; + TMaybe<TMatchedRead> matched; + + TMaybeNode<TKqpReadTable> mayberead; + VisitExpr(stage.Program().Body().Ptr(), [&](const TExprNode::TPtr& node) { + TExprBase expr(node); + if (auto cast = expr.Maybe<TKqpReadTable>()) { + Y_ENSURE(!matched || matched->Expr.Raw() == node.Get()); + auto read = cast.Cast(); + matched = TMatchedRead { + .Expr = read, + .Table = read.Table(), + .Columns = read.Columns(), + .Settings = read.Settings(), + .RangeExpr = read.Range() + }; + } + + if (auto cast = expr.Maybe<TKqpReadTableRanges>()) { + Y_ENSURE(!matched || matched->Expr.Raw() == node.Get()); + auto read = cast.Cast(); + matched = TMatchedRead { + .Expr = read, + .Table = read.Table(), + .Columns = read.Columns(), + .Settings = read.Settings(), + .RangeExpr = read.Ranges(), + .ExplainPrompt = read.ExplainPrompt() + }; + } + return true; + }); + + if (!matched) { + return node; + } + + auto& tableDesc = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, matched->Table.Path()); + if (!UseSource(kqpCtx, tableDesc)) { + return node; + } + + auto settings = TKqpReadTableSettings::Parse(matched->Settings); + auto selectColumns = matched->Columns; + TVector<TCoAtom> skipNullColumns; + if (settings.SkipNullKeys) { + THashSet<TString> seenColumns; + TVector<TCoAtom> columns; + + for (size_t i = 0; i < matched->Columns.Size(); ++i) { + auto atom = matched->Columns.Item(i); + auto column = atom.StringValue(); + if (seenColumns.insert(column).second) { + columns.push_back(atom); + } + } + for (auto& column : settings.SkipNullKeys) { + TCoAtom atom(ctx.NewAtom(matched->Settings.Pos(), column)); + skipNullColumns.push_back(atom); + if (seenColumns.insert(column).second) { + columns.push_back(atom); + } + } + + matched->Columns = Build<TCoAtomList>(ctx, matched->Columns.Pos()).Add(columns).Done(); + + settings.SkipNullKeys.clear(); + matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos()); + } + + TVector<TExprBase> inputs; + TVector<TCoArgument> args; + TNodeOnNodeOwnedMap argReplaces; + TNodeOnNodeOwnedMap sourceReplaces; + + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + inputs.push_back(stage.Inputs().Item(i)); + + TCoArgument newArg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_pc_arg_" << i)}; + args.push_back(newArg); + + TCoArgument arg = stage.Program().Args().Arg(i); + + argReplaces[arg.Raw()] = newArg.Ptr(); + sourceReplaces[arg.Raw()] = stage.Inputs().Item(i).Ptr(); + } + + TCoArgument arg{ctx.NewArgument(stage.Pos(), TStringBuilder() << "_kqp_source_arg")}; + args.insert(args.begin(), arg); + if (skipNullColumns) { + argReplaces[matched->Expr.Raw()] = + Build<TCoExtractMembers>(ctx, node.Pos()) + .Members(selectColumns) + .Input<TCoSkipNullMembers>() + .Input<TCoToFlow>().Input(arg).Build() + .Members().Add(skipNullColumns).Build() + .Build() + .Done().Ptr(); + } else { + argReplaces[matched->Expr.Raw()] = + Build<TCoToFlow>(ctx, matched->Expr.Pos()) + .Input(arg) + .Done() + .Ptr(); + } + + auto source = + Build<TDqSource>(ctx, matched->Expr.Pos()) + .Settings<TKqpReadRangesSourceSettings>() + .Table(matched->Table) + .Columns(matched->Columns) + .Settings(matched->Settings) + .RangesExpr(matched->RangeExpr) + .ExplainPrompt(matched->ExplainPrompt) + .Build() + .DataSource<TCoDataSource>() + .Category<TCoAtom>().Value(KqpReadRangesSourceName).Build() + .Build() + .Done(); + inputs.insert(inputs.begin(), TExprBase(ctx.ReplaceNodes(source.Ptr(), sourceReplaces))); + + return Build<TDqStage>(ctx, stage.Pos()) + .Inputs().Add(inputs).Build() + .Outputs(stage.Outputs()) + .Settings(stage.Settings()) + .Program() + .Args(args) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argReplaces)) + .Build() + .Done(); +} + +} // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index b80d34f5a27..bd231ab15c9 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -836,14 +836,9 @@ public: TString lastKey = "(empty)"; if (!token.GetLastProcessedKey().empty()) { TStringBuilder builder; - TVector<NScheme::TTypeInfo> types; - for (auto& column : Settings.GetColumns()) { - types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType())); - } - TSerializedCellVec row(token.GetLastProcessedKey()); - lastKey = DebugPrintPoint(types, row.GetCells(), *AppData()->TypeRegistry); + lastKey = DebugPrintPoint(KeyColumnTypes, row.GetCells(), *AppData()->TypeRegistry); } return TStringBuilder() << "first request = " << token.GetFirstUnprocessedQuery() << " lastkey = " << lastKey; } diff --git a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp index c763040b3de..a669187a797 100644 --- a/ydb/core/kqp/ut/query/kqp_explain_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_explain_ut.cpp @@ -81,6 +81,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT(ValidatePlanNodeIds(plan)); auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter"); + if (!join.IsDefined()) { + join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"); + } UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); @@ -108,6 +111,9 @@ Y_UNIT_TEST_SUITE(KqpExplain) { UNIT_ASSERT(ValidatePlanNodeIds(plan)); auto join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter"); + if (!join.IsDefined()) { + join = FindPlanNodeByKv(plan, "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan"); + } UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); @@ -188,6 +194,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) { auto res = CollectStreamResult(it); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); UNIT_ASSERT(res.PlanJson); + Cerr << *res.PlanJson; NJson::TJsonValue plan; NJson::ReadJsonTree(*res.PlanJson, &plan, true); @@ -198,6 +205,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) { "Node Type", "Aggregate-InnerJoin (MapJoin)-Filter" ); + if (!join.IsDefined()) { + join = FindPlanNodeByKv( + plan, + "Node Type", + "Aggregate-InnerJoin (MapJoin)-Filter-TableFullScan" + ); + } UNIT_ASSERT(join.IsDefined()); auto left = FindPlanNodeByKv(join, "Table", "EightShard"); UNIT_ASSERT(left.IsDefined()); |