diff options
author | Pavel Velikhov <pavelvelikhov@ydb.tech> | 2025-05-30 16:48:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-30 13:48:51 +0000 |
commit | 7b288ef1f52ea97c71014dac8ee95bbde82bd6e3 (patch) | |
tree | 90045d9c1589b38593bf6d3ea10b9af92ecdeb6b | |
parent | 657d1b4471360d0182f947e1e56d6a483f49c540 (diff) | |
download | ydb-7b288ef1f52ea97c71014dac8ee95bbde82bd6e3.tar.gz |
Generate final physical plan from rule based optimizer (#19028)
Co-authored-by: Pavel Velikhov <pavelvelikhov@localhost.localdomain>
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 44 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_type_ann.cpp | 16 | ||||
-rw-r--r-- | ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp | 54 | ||||
-rw-r--r-- | ydb/core/kqp/opt/peephole/kqp_opt_peephole.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp | 507 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_operator.cpp | 32 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_operator.h | 18 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_rbo.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_rbo.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp | 77 | ||||
-rw-r--r-- | ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h | 22 | ||||
-rw-r--r-- | ydb/core/kqp/query_compiler/kqp_query_compiler.cpp | 1 |
12 files changed, 646 insertions, 143 deletions
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index d6f0ce370f4..e75f803943a 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -334,7 +334,21 @@ private: .Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery") .Build(false)); - auto newRBOPhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) + auto kqpTypeAnnTransformer = TTransformationPipeline(typesCtx) + .AddServiceTransformers() + .Add(Log("RBOTypeAnnotator"), "LogRBOTypeAnnotator") + .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) + .Build(false); + + auto newRBOPhysicalPeepholeTransformer = TTransformationPipeline(typesCtx) + .AddServiceTransformers() + .Add(Log("PhysicalPeephole"), "LogPhysicalPeephole") + .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) + .AddPostTypeAnnotation() + .Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole") + .Build(false); + + auto newRBOPhysicalOptimizeTransformer = TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(Log("NewRBOPhysicalOptimize"), "LogNewRBOPhysicalOptimize") .AddPreTypeAnnotation() @@ -342,19 +356,20 @@ private: .AddIOAnnotation() .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) - .Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery") + //.Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery") .AddPostTypeAnnotation(/* forSubgraph */ true) - //.AddCommonOptimization() + //.AddCommonOptimization()s .Add(CreateKqpPgRewriteTransformer(OptimizeCtx, *typesCtx), "RewritePgSelect") - .Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, Config), "NewRBOTransformer") + .Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, Config, kqpTypeAnnTransformer, newRBOPhysicalPeepholeTransformer), "NewRBOTransformer") + .Add(CreateKqpRBOCleanupTransformer(*typesCtx), "RBOCleanupTransformer") //.Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize") //.Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize") //.Add(CreateKqpQueryPhasesTransformer(), "QueryPhases") //.Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects") //.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery") - .Build(false)); + .Build(false); auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx) .AddServiceTransformers() @@ -421,19 +436,6 @@ private: *typesCtx), *typesCtx, Config), "Peephole") .Build(false); - auto newRBOPhysicalPeepholeTransformer = TTransformationPipeline(typesCtx) - .AddServiceTransformers() - .Add(Log("PhysicalPeephole"), "LogPhysicalPeephole") - .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config)) - .AddPostTypeAnnotation() - .Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole") - .Add( - CreateKqpTxsPeepholeTransformer( - CreateTypeAnnotationTransformer( - CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config), - *typesCtx), *typesCtx, Config), "Peephole") - .Build(false); - TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster, *TransformCtx, *OptimizeCtx, @@ -452,15 +454,19 @@ private: { TTransformStage{ newRBOPhysicalOptimizeTransformer, "NewRBOPhysicalOptimize", TIssuesIds::DEFAULT_ERROR }, LogStage("NewRBOPhysicalOptimize"), + /* TTransformStage{ newRBOPhysicalBuildTxsTransformer, "NewRBOPhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR }, LogStage("NewRBOPhysicalBuildTxs"), TTransformStage{ newRBOPhysicalBuildQueryTransformer, "NewRBOPhysicalBuildQuery", TIssuesIds::DEFAULT_ERROR }, LogStage("NewRBOPhysicalBuildQuery"), + */ TTransformStage{ CreateSaveExplainTransformerInput(*TransformCtx), "NewRBOSaveExplainTransformerInput", TIssuesIds::DEFAULT_ERROR }, + /* TTransformStage{ newRBOPhysicalPeepholeTransformer, "NewRBOPhysicalPeephole", TIssuesIds::DEFAULT_ERROR }, LogStage("NewRBOPhysicalPeephole"), + */ TTransformStage{ newRBOCompilePhysicalQuery, "CompilePhysicalQuery", TIssuesIds::DEFAULT_ERROR }, - TTransformStage{ newRBOPreparedExplainTransformer, "NewRBOExplainQuery", TIssuesIds::DEFAULT_ERROR }, // TODO(sk): only on stats mode or if explain-only + //TTransformStage{ newRBOPreparedExplainTransformer, "NewRBOExplainQuery", TIssuesIds::DEFAULT_ERROR }, // TODO(sk): only on stats mode or if explain-only }, false ); diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index d3d8920615b..5e4a89141f7 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1430,14 +1430,20 @@ TStatus AnnotateKqpPhysicalTx(const TExprNode::TPtr& node, TExprContext& ctx) { return TStatus::Ok; } -TStatus AnnotateKqpPhysicalQuery(const TExprNode::TPtr& node, TExprContext& ctx) { +TStatus AnnotateKqpPhysicalQuery(const TExprNode::TPtr& node, TExprContext& ctx, bool enableRBO) { if (!EnsureArgsCount(*node, 3, ctx)) { return TStatus::Error; } - // TODO: ??? - - node->SetTypeAnn(ctx.MakeType<TVoidExprType>()); + // We need to infer the type of physical query for RBO at this time + if (enableRBO) { + TKqpPhysicalQuery query(node); + auto type = query.Results().Item(0).Ptr()->GetTypeAnn(); + node->SetTypeAnn(type); + } + else { + node->SetTypeAnn(ctx.MakeType<TVoidExprType>()); + } return TStatus::Ok; } @@ -2207,7 +2213,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl } if (TKqpPhysicalQuery::Match(input.Get())) { - return AnnotateKqpPhysicalQuery(input, ctx); + return AnnotateKqpPhysicalQuery(input, ctx, config->EnableNewRBO); } if (TKqpEffects::Match(input.Get())) { diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index 4e853906c32..ca3132ce128 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -310,33 +310,6 @@ bool CanPropagateWideBlockThroughChannel( return true; } -TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx, - IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config, - bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts) -{ - TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts); - TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config); - TPeepholeSettings peepholeSettings; - peepholeSettings.CommonConfig = &kqpPeephole; - peepholeSettings.FinalConfig = &kqpPeepholeFinal; - peepholeSettings.WithFinalStageRules = withFinalStageRules; - peepholeSettings.WithNonDeterministicRules = false; - - bool hasNonDeterministicFunctions; - auto status = PeepHoleOptimizeNode(program.Ptr(), newProgram, ctx, typesCtx, &typeAnnTransformer, - hasNonDeterministicFunctions, peepholeSettings); - if (status == TStatus::Error) { - return status; - } - - if (!allowNonDeterministicFunctions && hasNonDeterministicFunctions) { - ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Unexpected non-deterministic functions in KQP program")); - return TStatus::Error; - } - - return status; -} - TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprContext& ctx, IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages, TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts) @@ -623,6 +596,33 @@ private: } // anonymous namespace +TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx, + IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config, + bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts) +{ + TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts); + TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config); + TPeepholeSettings peepholeSettings; + peepholeSettings.CommonConfig = &kqpPeephole; + peepholeSettings.FinalConfig = &kqpPeepholeFinal; + peepholeSettings.WithFinalStageRules = withFinalStageRules; + peepholeSettings.WithNonDeterministicRules = false; + + bool hasNonDeterministicFunctions; + auto status = PeepHoleOptimizeNode(program.Ptr(), newProgram, ctx, typesCtx, &typeAnnTransformer, + hasNonDeterministicFunctions, peepholeSettings); + if (status == TStatus::Error) { + return status; + } + + if (!allowNonDeterministicFunctions && hasNonDeterministicFunctions) { + ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Unexpected non-deterministic functions in KQP program")); + return TStatus::Error; + } + + return status; +} + TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer( NYql::IGraphTransformer* typeAnnTransformer, TTypeAnnotationContext& typesCtx, diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h index 2c489d58eaf..a03eb3f382a 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h @@ -18,4 +18,8 @@ TAutoPtr<NYql::IGraphTransformer> CreateKqpTxsPeepholeTransformer( const NYql::TKikimrConfiguration::TPtr& config ); +NYql::IGraphTransformer::TStatus PeepHoleOptimize(const NYql::NNodes::TExprBase& program, NYql::TExprNode::TPtr& newProgram, NYql::TExprContext& ctx, + NYql::IGraphTransformer& typeAnnTransformer, NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config, + bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts); + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp b/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp index b8584527aac..41259d25812 100644 --- a/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp @@ -1,4 +1,9 @@ #include "kqp_rbo.h" + +#include <yql/essentials/core/yql_graph_transformer.h> +#include <ydb/library/yql/dq/opt/dq_opt_peephole.h> +#include <ydb/core/kqp/opt/peephole/kqp_opt_peephole.h> + #include <yql/essentials/utils/log/log.h> using namespace NYql::NNodes; @@ -43,12 +48,328 @@ namespace { } } + [[maybe_unused]] + TExprNode::TPtr ExtractMembers(TExprNode::TPtr input, TExprContext& ctx, TVector<TInfoUnit> members) { + TVector<TExprBase> items; + auto arg = Build<TCoArgument>(ctx, input->Pos()).Name("arg").Done().Ptr(); + + for (auto iu : members) { + auto name = iu.GetFullName(); + auto tuple = Build<TCoNameValueTuple>(ctx, input->Pos()) + .Name().Build(name) + .Value<TCoMember>() + .Struct(arg) + .Name().Build(name) + .Build() + .Done(); + + items.push_back(tuple); + } + + return Build<TCoFlatMap>(ctx, input->Pos()) + .Input(input) + .Lambda<TCoLambda>() + .Args({arg}) + .Body<TCoJust>() + .Input<TCoAsStruct>() + .Add(items) + .Build() + .Build() + .Build() + .Done().Ptr(); + } + + [[maybe_unused]] + TExprNode::TPtr PeepholeStageLambda(TExprNode::TPtr stageLambda, + TVector<TExprNode::TPtr> inputs, + TExprContext& ctx, + TTypeAnnotationContext& types, + TAutoPtr<IGraphTransformer> typeAnnTransformer, + TAutoPtr<IGraphTransformer> peepholeTransformer, + TKikimrConfiguration::TPtr config) { + + Y_UNUSED(peepholeTransformer); + + // Compute types of inputs to stage lambda + + TVector<const TTypeAnnotationNode*> argTypes; + + for (auto input : inputs) { + typeAnnTransformer->Rewind(); + + IGraphTransformer::TStatus status(IGraphTransformer::TStatus::Ok); + do { + status = typeAnnTransformer->Transform(input, input, ctx); + } while (status == IGraphTransformer::TStatus::Repeat); + + if (status != IGraphTransformer::TStatus::Ok) { + YQL_CLOG(ERROR, ProviderKqp) << "RBO type annotation failed." << Endl << ctx.IssueManager.GetIssues().ToString(); + return nullptr; + } + + argTypes.push_back(input->GetTypeAnn()); + } + + // Build a temporary KqpProgram to run peephole on + auto program = Build<TKqpProgram>(ctx, stageLambda->Pos()) + .Lambda(stageLambda) + .ArgsType(ExpandType(stageLambda->Pos(), *ctx.MakeType<TTupleExprType>(argTypes), ctx)) + .Done(); + + auto programPtr = program.Ptr(); + + const bool allowNonDeterministicFunctions = false; + TExprNode::TPtr newProgram; + auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer.GetRef(), types, config, + allowNonDeterministicFunctions, true, {}); + if (status != IGraphTransformer::TStatus::Ok) { + ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Peephole optimization failed for KQP transaction")); + return {}; + } + + return TKqpProgram(newProgram).Lambda().Ptr(); + } + + TExprNode::TPtr BuildCrossJoin(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) { + + TCoArgument leftArg{ctx.NewArgument(pos, "_kqp_left")}; + TCoArgument rightArg{ctx.NewArgument(pos, "_kqp_right")}; + + TVector<TExprNode::TPtr> keys; + for (auto iu : join.GetLeftInput()->GetOutputIUs()) { + keys.push_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(iu.GetFullName()) + .Value<TCoMember>() + .Struct(leftArg) + .Name().Build(iu.GetFullName()) + .Build() + .Done().Ptr()); + } + + for (auto iu : join.GetRightInput()->GetOutputIUs()) { + keys.push_back( + Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(iu.GetFullName()) + .Value<TCoMember>() + .Struct(rightArg) + .Name().Build(iu.GetFullName()) + .Build() + .Done().Ptr()); + } + + // we have to `Condense` right input as single-element stream of lists (single list of all elements from the right), + // because stream supports single iteration only + auto itemArg = Build<TCoArgument>(ctx, pos).Name("item").Done(); + auto rightAsStreamOfLists = Build<TCoCondense1>(ctx, pos) + .Input<TCoToFlow>() + .Input(rightInput) + .Build() + .InitHandler() + .Args({itemArg}) + .Body<TCoAsList>() + .Add(itemArg) + .Build() + .Build() + .SwitchHandler() + .Args({"item", "state"}) + .Body<TCoBool>() + .Literal().Build("false") + .Build() + .Build() + .UpdateHandler() + .Args({"item", "state"}) + .Body<TCoAppend>() + .List("state") + .Item("item") + .Build() + .Build() + .Done(); + + auto flatMap = Build<TCoFlatMap>(ctx, pos) + .Input(rightAsStreamOfLists) + .Lambda() + .Args({"rightAsList"}) + .Body<TCoFlatMap>() + .Input(leftInput) + .Lambda() + .Args({leftArg}) + .Body<TCoMap>() + // here we have `List`, so we can iterate over it many times (for every `leftArg`) + .Input("rightAsList") + .Lambda() + .Args({rightArg}) + .Body<TCoAsStruct>() + .Add(keys) + .Build() + .Build() + .Build() + .Build() + .Build() + .Build() + .Done().Ptr(); + + return Build<TCoFromFlow>(ctx, pos) + .Input(flatMap) + .Done().Ptr(); + } + + TExprNode::TPtr ExpandJoinInput(TExprNode::TPtr input, TVector<TInfoUnit> ius, TExprContext& ctx) { + return ctx.Builder(input->Pos()) + .Callable("ExpandMap") + .Add(0, input) + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + auto i = 0U; + for (const auto& iu: ius) { + parent.Callable(i, "Member") + .Arg(0, "item") + .Atom(1, iu.GetFullName()) + .Seal(); + i++; + } + return parent; + }) + .Seal() + .Seal().Build(); + } + + TExprNode::TPtr CollapseJoinOutput(TExprNode::TPtr graceJoin, TVector<TInfoUnit> ius, TExprContext& ctx) { + return ctx.Builder(graceJoin->Pos()) + .Callable("NarrowMap") + .Add(0, graceJoin) + .Lambda(1) + .Params("output", ius.size()) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& iu : ius) { + parent.List(i) + .Atom(0, iu.GetFullName()) + .Arg(1, "output", i) + .Seal(); + i++; + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Build(); + } + + TExprNode::TPtr BuildGraceJoinCore(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) { + TVector<TCoAtom> leftColumnIdxs; + TVector<TCoAtom> rightColumnIdxs; + + TVector<TCoAtom> leftRenames, rightRenames; + + TVector<TCoAtom> leftKeyColumnNames; + TVector<TCoAtom> rightKeyColumnNames; + + auto leftIUs = join.GetLeftInput()->GetOutputIUs(); + auto rightIUs = join.GetRightInput()->GetOutputIUs(); + + auto leftTupleSize = leftIUs.size(); + for (size_t i=0; i<leftIUs.size(); i++) { + leftRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done()); + leftRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done()); + } + + for (size_t i=0; i<rightIUs.size(); i++) { + rightRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done()); + rightRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i+leftTupleSize).Done()); + } + + for (auto k : join.JoinKeys) { + auto leftIdx = std::distance(leftIUs.begin(), std::find(leftIUs.begin(), leftIUs.end(), k.first)); + auto rightIdx = std::distance(rightIUs.begin(), std::find(rightIUs.begin(), rightIUs.end(), k.second)); + + leftColumnIdxs.push_back(Build<TCoAtom>(ctx, pos).Value(leftIdx).Done()); + rightColumnIdxs.push_back(Build<TCoAtom>(ctx, pos).Value(rightIdx).Done()); + + leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(k.first.GetFullName()).Done()); + rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(k.second.GetFullName()).Done()); + } + + // Convert to wide flow + + leftInput = Build<TCoToFlow>(ctx, pos) + .Input(leftInput) + .Done().Ptr(); + + leftInput = ExpandJoinInput(leftInput, join.GetLeftInput()->GetOutputIUs(), ctx); + + rightInput = Build<TCoToFlow>(ctx, pos) + .Input(rightInput) + .Done().Ptr(); + + rightInput = ExpandJoinInput(rightInput, join.GetRightInput()->GetOutputIUs(), ctx); + + auto graceJoin = Build<TCoGraceJoinCore>(ctx, pos) + .LeftInput(leftInput) + .RightInput(rightInput) + .JoinKind<TCoAtom>().Value("Inner").Build() + .LeftKeysColumns<TCoAtomList>().Add(leftColumnIdxs).Build() + .RightKeysColumns<TCoAtomList>().Add(rightColumnIdxs).Build() + .LeftRenames().Add(leftRenames).Build() + .RightRenames().Add(rightRenames).Build() + .LeftKeysColumnNames<TCoAtomList>().Add(leftKeyColumnNames).Build() + .RightKeysColumnNames<TCoAtomList>().Add(rightKeyColumnNames).Build() + .Flags().Build() + .Done().Ptr(); + + // Convert back to narrow stream + return Build<TCoFromFlow>(ctx, pos) + .Input(CollapseJoinOutput(graceJoin, join.GetOutputIUs(), ctx)) + .Done().Ptr(); + } + + [[maybe_unused]] + TExprNode::TPtr BuildDqGraceJoin(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) { + TVector<TDqJoinKeyTuple> joinKeys; + TVector<TCoAtom> leftKeyColumnNames; + TVector<TCoAtom> rightKeyColumnNames; + + for (auto p : join.JoinKeys) { + TString leftFullName = "_alias_" + p.first.Alias + "." + p.first.ColumnName; + TString rightFullName = "_alias_" + p.second.Alias + "." + p.second.ColumnName; + + joinKeys.push_back(Build<TDqJoinKeyTuple>(ctx, pos) + .LeftLabel().Value("_alias_" + p.first.Alias).Build() + .LeftColumn().Value(p.first.ColumnName).Build() + .RightLabel().Value("_alias_" + p.second.Alias).Build() + .RightColumn().Value(p.second.ColumnName).Build() + .Done()); + + leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(leftFullName).Done()); + rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(rightFullName).Done()); + } + + return Build<TDqPhyGraceJoin>(ctx, pos) + .LeftInput(leftInput) + .LeftLabel<TCoVoid>().Build() + .RightInput(rightInput) + .RightLabel<TCoVoid>().Build() + .JoinType<TCoAtom>().Value("Inner").Build() + .JoinKeys<TDqJoinKeyTupleList>().Add(joinKeys).Build() + .LeftJoinKeyNames<TCoAtomList>().Add(leftKeyColumnNames).Build() + .RightJoinKeyNames<TCoAtomList>().Add(rightKeyColumnNames).Build() + .Done().Ptr(); + } + + } namespace NKikimr { namespace NKqp { -TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { +TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx, TTypeAnnotationContext& types, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer, TKikimrConfiguration::TPtr config) { + Y_UNUSED(types); + Y_UNUSED(peepholeTransformer); + Y_UNUSED(config); + THashMap<int, TExprNode::TPtr> stages; THashMap<int, TVector<TExprNode::TPtr>> stageArgs; auto & graph = root.PlanProps.StageGraph; @@ -112,40 +433,45 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { auto filter = TKqpOpFilter(op->Node); auto filterBody = filter.Lambda().Body(); - auto arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr(); - auto newFilterBody = ReplaceArg(filterBody.Ptr(), arg, ctx); + auto filter_arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr(); + auto map_arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr(); + + auto newFilterBody = ReplaceArg(filterBody.Ptr(), filter_arg, ctx); newFilterBody = ctx.Builder(root.Node->Pos()).Callable("FromPg").Add(0, newFilterBody).Seal().Build(); TVector<TExprBase> items; for (auto iu : op->GetOutputIUs()) { - auto memberName = "_alias_" + iu.Alias + "." + iu.ColumnName; + auto memberName = iu.GetFullName(); auto tuple = Build<TCoNameValueTuple>(ctx, root.Node->Pos()) .Name().Build(memberName) .Value<TCoMember>() - .Struct(arg) + .Struct(map_arg) .Name().Build(memberName) .Build() .Done(); items.push_back(tuple); } - - currentStageBody = Build<TCoFlatMap>(ctx, root.Node->Pos()) - .Input(TExprBase(currentStageBody)) - .Lambda<TCoLambda>() - .Args({arg}) - .Body<TCoOptionalIf>() - .Predicate<TCoCoalesce>() + + currentStageBody = Build<TCoMap>(ctx, root.Node->Pos()) + .Input<TCoFilter>() + .Input(TExprBase(currentStageBody)) + .Lambda<TCoLambda>() + .Args({filter_arg}) + .Body<TCoCoalesce>() .Predicate(newFilterBody) .Value<TCoBool>() .Literal().Build("false") .Build() .Build() - .Value<TCoAsStruct>() - .Add(items) .Build() .Build() + .Lambda<TCoLambda>() + .Args({map_arg}) + .Body<TCoAsStruct>() + .Add(items) + .Build() .Build() .Done().Ptr(); @@ -175,14 +501,12 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { items.push_back(tuple); } - currentStageBody = Build<TCoFlatMap>(ctx, root.Node->Pos()) + currentStageBody = Build<TCoMap>(ctx, root.Node->Pos()) .Input(TExprBase(currentStageBody)) .Lambda<TCoLambda>() .Args({arg}) - .Body<TCoJust>() - .Input<TCoAsStruct>() - .Add(items) - .Build() + .Body<TCoAsStruct>() + .Add(items) .Build() .Build() .Done().Ptr(); @@ -214,51 +538,13 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { stageArgs[opStageId].push_back(rightArg); if (join->JoinKind == "Cross") { - currentStageBody = Build<TDqPhyCrossJoin>(ctx, root.Node->Pos()) - .LeftInput(leftInput) - .LeftLabel<TCoVoid>().Build() - .RightInput(rightInput) - .RightLabel<TCoVoid>().Build() - .JoinType<TCoAtom>().Value("Cross").Build() - .JoinKeys<TDqJoinKeyTupleList>().Build() - .LeftJoinKeyNames<TCoAtomList>().Build() - .RightJoinKeyNames<TCoAtomList>().Build() - .Done().Ptr(); + currentStageBody = BuildCrossJoin(*join, leftInput, rightInput, ctx, root.Node->Pos()); } else if (join->JoinKind == "Inner") { - TVector<TDqJoinKeyTuple> joinKeys; - TVector<TCoAtom> leftKeyColumnNames; - TVector<TCoAtom> rightKeyColumnNames; - - for (auto p : join->JoinKeys) { - TString leftFullName = "_alias_" + p.first.Alias + "." + p.first.ColumnName; - TString rightFullName = "_alias_" + p.second.Alias + "." + p.second.ColumnName; - - joinKeys.push_back(Build<TDqJoinKeyTuple>(ctx, root.Node->Pos()) - .LeftLabel().Value("_alias_" + p.first.Alias).Build() - .LeftColumn().Value(p.first.ColumnName).Build() - .RightLabel().Value("_alias_" + p.second.Alias).Build() - .RightColumn().Value(p.second.ColumnName).Build() - .Done()); - - leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, root.Node->Pos()).Value(leftFullName).Done()); - rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, root.Node->Pos()).Value(rightFullName).Done()); - } - - currentStageBody = Build<TDqPhyGraceJoin>(ctx, root.Node->Pos()) - .LeftInput(leftInput) - .LeftLabel<TCoVoid>().Build() - .RightInput(rightInput) - .RightLabel<TCoVoid>().Build() - .JoinType<TCoAtom>().Value("Inner").Build() - .JoinKeys<TDqJoinKeyTupleList>().Add(joinKeys).Build() - .LeftJoinKeyNames<TCoAtomList>().Add(leftKeyColumnNames).Build() - .RightJoinKeyNames<TCoAtomList>().Add(rightKeyColumnNames).Build() - .Done().Ptr(); - } + currentStageBody = BuildGraceJoinCore(*join, leftInput, rightInput, ctx, root.Node->Pos()); + } else { Y_ENSURE(false, "Unsupported join kind"); - } stages[opStageId] = currentStageBody; @@ -271,9 +557,10 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { // We need to build up stages in a topological sort order graph.TopologicalSort(); - THashMap<int, TExprNode::TPtr> finalizedStages; + TVector<TExprNode::TPtr> txStages; + auto stageIds = graph.StageIds; auto stageInputIds = graph.StageInputs; @@ -286,7 +573,11 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { auto inputStage = finalizedStages.at(inputStageId); auto c = graph.GetConnection(inputStageId, id); YQL_CLOG(TRACE, CoreDq) << "Building connection: " << inputStageId << "->" << id << ", " << c->Type; - auto conn = c->BuildConnection(inputStage, root.Node, ctx); + TExprNode::TPtr newStage; + auto conn = c->BuildConnection(inputStage, root.Node, newStage, ctx); + if (newStage) { + txStages.push_back(newStage); + } YQL_CLOG(TRACE, CoreDq) << "Built connection: " << inputStageId << "->" << id << ", " << c->Type; inputs.push_back(conn); } @@ -295,7 +586,7 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { stage = stages.at(id); } else { - stage = Build<TDqStage>(ctx, root.Node->Pos()) + stage = Build<TDqPhyStage>(ctx, root.Node->Pos()) .Inputs() .Add(inputs) .Build() @@ -304,7 +595,11 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { .Body(stages.at(id)) .Build() .Settings().Build() - .Done().Ptr(); + .Done() + .Ptr(); + + txStages.push_back(stage); + YQL_CLOG(TRACE, CoreDq) << "Added stage " << stage->UniqueId(); } finalizedStages[id] = stage; @@ -314,15 +609,91 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) { // Build a union all for the last stage int lastStageIdx = stageIds[stageIds.size()-1]; - auto result = Build<TDqCnUnionAll>(ctx, root.Node->Pos()) + auto lastStage = finalizedStages.at(lastStageIdx); + + // wrap in DqResult + auto dqResult = Build<TDqCnResult>(ctx, root.Node->Pos()) .Output() - .Stage(finalizedStages.at(lastStageIdx)) + .Stage(lastStage) .Index().Build("0") .Build() - .Done().Ptr(); + .ColumnHints().Build() + .Done() + .Ptr(); + + TVector<TExprNode::TPtr> txSettings; + txSettings.push_back(Build<TCoNameValueTuple>(ctx, root.Node->Pos()) + .Name().Build("type") + .Value<TCoAtom>().Build("compute") + .Done() + .Ptr()); + // Build PhysicalTx + auto physTx = Build<TKqpPhysicalTx>(ctx, root.Node->Pos()) + .Stages() + .Add(txStages) + .Build() + .Results() + .Add({dqResult}) + .Build() + .ParamBindings().Build() + .Settings() + .Add(txSettings) + .Build() + .Done() + .Ptr(); + + TVector<TExprNode::TPtr> querySettings; + querySettings.push_back(Build<TCoNameValueTuple>(ctx, root.Node->Pos()) + .Name().Build("type") + .Value<TCoAtom>().Build("data_query") + .Done() + .Ptr()); + + // Build result type + + typeAnnTransformer->Rewind(); + IGraphTransformer::TStatus status(IGraphTransformer::TStatus::Ok); + do { + status = typeAnnTransformer->Transform(dqResult, dqResult, ctx); + } while (status == IGraphTransformer::TStatus::Repeat); + + if (status != IGraphTransformer::TStatus::Ok) { + YQL_CLOG(ERROR, ProviderKqp) << "RBO type annotation failed." << Endl << ctx.IssueManager.GetIssues().ToString(); + return nullptr; + } + + YQL_CLOG(TRACE, CoreDq) << "Inferred final type: " << *dqResult->GetTypeAnn(); + + auto binding = Build<TKqpTxResultBinding>(ctx, root.Node->Pos()) + .Type(ExpandType(root.Node->Pos(), *dqResult->GetTypeAnn(), ctx)) + .TxIndex().Build("0") + .ResultIndex().Build("0") + .Done(); + + // Build Physical query + auto physQuery = Build<TKqpPhysicalQuery>(ctx, root.Node->Pos()) + .Transactions() + .Add({physTx}) + .Build() + .Results() + .Add({binding}) + .Build() + .Settings() + .Add(querySettings) + .Build() + .Done() + .Ptr(); + + /* + typeAnnTransformer->Rewind(); + do { + status = typeAnnTransformer->Transform(physQuery, physQuery, ctx); + } while (status == IGraphTransformer::TStatus::Repeat); + */ + YQL_CLOG(TRACE, CoreDq) << "Final plan built"; - return result; + return physQuery; } } diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.cpp b/ydb/core/kqp/opt/rbo/kqp_operator.cpp index a523f3dead9..f8a6aa91d19 100644 --- a/ydb/core/kqp/opt/rbo/kqp_operator.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_operator.cpp @@ -73,6 +73,7 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext& ctx, TVector<TIn items.push_back(tuple); } + /* return Build<TCoFlatMap>(ctx, input->Pos()) .Input(input) .Lambda<TCoLambda>() @@ -84,11 +85,23 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext& ctx, TVector<TIn .Build() .Build() .Done().Ptr(); + */ + + return Build<TCoMap>(ctx, input->Pos()) + .Input(input) + .Lambda<TCoLambda>() + .Args({arg}) + .Body<TCoAsStruct>() + .Add(items) + .Build() + .Build() + .Done().Ptr(); + } TExprNode::TPtr BuildSourceStage(TExprNode::TPtr dqsource, TExprContext& ctx) { auto arg = Build<TCoArgument>(ctx, dqsource->Pos()).Name("arg").Done().Ptr(); - return Build<TDqStage>(ctx, dqsource->Pos()) + return Build<TDqPhyStage>(ctx, dqsource->Pos()) .Inputs() .Add({dqsource}) .Build() @@ -122,13 +135,14 @@ TInfoUnit::TInfoUnit(TString name) { } } -inline bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs) { +bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs) { return lhs.Alias == rhs.Alias && lhs.ColumnName == rhs.ColumnName; } -TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) { +TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) { if (FromSourceStage) { inputStage = BuildSourceStage(inputStage, ctx); + newStage = inputStage; } return Build<TDqCnBroadcast>(ctx, node->Pos()) .Output() @@ -138,9 +152,10 @@ TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage .Done().Ptr(); } -TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) { +TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) { if (FromSourceStage) { inputStage = BuildSourceStage(inputStage, ctx); + newStage = inputStage; } return Build<TDqCnMap>(ctx, node->Pos()) .Output() @@ -150,9 +165,10 @@ TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExp .Done().Ptr(); } -TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) { +TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) { if (FromSourceStage) { inputStage = BuildSourceStage(inputStage, ctx); + newStage = inputStage; } return Build<TDqCnUnionAll>(ctx, node->Pos()) .Output() @@ -162,11 +178,12 @@ TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage, .Done().Ptr(); } -TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) { +TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) { TVector<TCoAtom> keyColumns; if (FromSourceStage) { inputStage = BuildSourceStage(inputStage, ctx); + newStage = inputStage; } for ( auto k : Keys ) { @@ -191,8 +208,9 @@ TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage, .Done().Ptr(); } -TExprNode::TPtr TSourceConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) { +TExprNode::TPtr TSourceConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) { Y_UNUSED(node); + Y_UNUSED(newStage); Y_UNUSED(ctx); return inputStage; } diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.h b/ydb/core/kqp/opt/rbo/kqp_operator.h index 01647cec3d0..be759b92bde 100644 --- a/ydb/core/kqp/opt/rbo/kqp_operator.h +++ b/ydb/core/kqp/opt/rbo/kqp_operator.h @@ -25,6 +25,10 @@ struct TInfoUnit { TInfoUnit(TString alias, TString column): Alias(alias), ColumnName(column) {} TInfoUnit(TString name); + TString GetFullName() const { + return ((Alias!="") ? ("_alias_" + Alias + ".") : "" ) + ColumnName; + } + TString Alias; TString ColumnName; @@ -37,7 +41,7 @@ struct TInfoUnit { }; }; -inline bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs); +bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs); struct TFilterInfo { TExprNode::TPtr FilterBody; @@ -63,7 +67,7 @@ struct TPhysicalOpProps { struct TConnection { TConnection(TString type, bool fromSourceStage) : Type(type), FromSourceStage(fromSourceStage) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) = 0; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) = 0; virtual ~TConnection() = default; TString Type; @@ -72,19 +76,19 @@ struct TConnection { struct TBroadcastConnection : public TConnection { TBroadcastConnection(bool fromSourceStage) : TConnection("Broadcast", fromSourceStage) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override; }; struct TMapConnection : public TConnection { TMapConnection(bool fromSourceStage) : TConnection("Map", fromSourceStage) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override; }; struct TUnionAllConnection : public TConnection { TUnionAllConnection(bool fromSourceStage) : TConnection("UnionAll", fromSourceStage) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override; }; @@ -93,14 +97,14 @@ struct TShuffleConnection : public TConnection { ,Keys(keys) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override; TVector<TInfoUnit> Keys; }; struct TSourceConnection : public TConnection { TSourceConnection() : TConnection("Source", true) {} - virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override; + virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override; }; diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo.cpp index bf4e5141e0d..1d4a0bb4183 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo.cpp @@ -72,7 +72,7 @@ TExprNode::TPtr TRuleBasedOptimizer::Optimize(TOpRoot & root, TExprContext& ctx } YQL_CLOG(TRACE, CoreDq) << "New RBO finished, generating physical plan"; - return ConvertToPhysical(root, ctx); + return ConvertToPhysical(root, ctx, TypeCtx, TypeAnnTransformer, PeepholeTransformer, Config); } } diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo.h b/ydb/core/kqp/opt/rbo/kqp_rbo.h index e4b8504f2cf..2673c789209 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo.h +++ b/ydb/core/kqp/opt/rbo/kqp_rbo.h @@ -55,10 +55,14 @@ class TRuleBasedOptimizer { TRuleBasedOptimizer(TVector<TRuleBasedStage> stages, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, - const TKikimrConfiguration::TPtr& config) : Stages(stages), + const TKikimrConfiguration::TPtr& config, + TAutoPtr<IGraphTransformer> typeAnnTransformer, + TAutoPtr<IGraphTransformer> peephole) : Stages(stages), KqpCtx(kqpCtx), TypeCtx(typeCtx), - Config(config) {} + Config(config), + TypeAnnTransformer(typeAnnTransformer), + PeepholeTransformer(peephole) {} TExprNode::TPtr Optimize(TOpRoot & root, TExprContext& ctx); @@ -66,9 +70,11 @@ class TRuleBasedOptimizer { const TIntrusivePtr<TKqpOptimizeContext>& KqpCtx; TTypeAnnotationContext& TypeCtx; const TKikimrConfiguration::TPtr& Config; + TAutoPtr<IGraphTransformer> TypeAnnTransformer; + TAutoPtr<IGraphTransformer> PeepholeTransformer; }; -TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx); +TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx, TTypeAnnotationContext& types, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer, TKikimrConfiguration::TPtr config); } }
\ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp index 5847b60b0e7..b17e4fb2940 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp @@ -157,7 +157,7 @@ IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr auto status = OptimizeExpr(output, output, [this] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { if (TCoPgSelect::Match(node.Get())) { return RewritePgSelect(node, ctx, TypeCtx); - } if (TCoTake::Match(node.Get())) { + } else if (TCoTake::Match(node.Get())) { return PushTakeIntoPlan(node, ctx, TypeCtx); } else { @@ -193,12 +193,83 @@ IGraphTransformer::TStatus TKqpNewRBOTransformer::DoTransform(TExprNode::TPtr in void TKqpNewRBOTransformer::Rewind() { } +IGraphTransformer::TStatus TKqpRBOCleanupTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + output = input; + TOptimizeExprSettings settings(&TypeCtx); + + Y_UNUSED(ctx); + + /* + auto status = OptimizeExpr(output, output, [] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + Y_UNUSED(ctx); + YQL_CLOG(TRACE, CoreDq) << "Checking if node " << node->UniqueId() << " is list: " << node->IsList(); + + if (node.Get()->IsList() && node.Get()->ChildrenSize()>=1) { + auto child_level_1 = node.Get()->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 0"; + + if (child_level_1->IsList() && child_level_1->ChildrenSize()>=1) { + auto child_level_2 = child_level_1->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 1"; + + if (child_level_2->IsList() && child_level_2->ChildrenSize()>=1) { + auto maybeQuery = child_level_2->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 2"; + + if (TKqpPhysicalQuery::Match(maybeQuery)) { + YQL_CLOG(TRACE, CoreDq) << "Found query node"; + return maybeQuery; + } + } + } + } + return node; + }, ctx, settings); + + */ + + YQL_CLOG(TRACE, CoreDq) << "Cleanup input plan: " << output->Dump(); + + if (output->IsList() && output->ChildrenSize()>=1) { + auto child_level_1 = output->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 0"; + + if (child_level_1->IsList() && child_level_1->ChildrenSize()>=1) { + auto child_level_2 = child_level_1->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 1"; + + if (child_level_2->IsList() && child_level_2->ChildrenSize()>=1) { + auto child_level_3 = child_level_2->Child(0); + YQL_CLOG(TRACE, CoreDq) << "Matched level 2"; + + if (child_level_3->IsList() && child_level_2->ChildrenSize()>=1) { + auto maybeQuery = child_level_3->Child(0); + + if (TKqpPhysicalQuery::Match(maybeQuery)) { + YQL_CLOG(TRACE, CoreDq) << "Found query node"; + output = maybeQuery; + } + } + } + } + } + + return IGraphTransformer::TStatus::Ok; +} + +void TKqpRBOCleanupTransformer::Rewind() { +} + TAutoPtr<IGraphTransformer> CreateKqpPgRewriteTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx) { return new TKqpPgRewriteTransformer(kqpCtx, typeCtx); } -TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) { - return new TKqpNewRBOTransformer(kqpCtx, typeCtx, config); +TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peephole) { + return new TKqpNewRBOTransformer(kqpCtx, typeCtx, config, typeAnnTransformer, peephole); +} + +TAutoPtr<IGraphTransformer> CreateKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx) { + return new TKqpRBOCleanupTransformer(typeCtx); } } diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h index fab71400ac1..515cf804117 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h @@ -36,10 +36,10 @@ TAutoPtr<IGraphTransformer> CreateKqpPgRewriteTransformer(const TIntrusivePtr<TK class TKqpNewRBOTransformer : public TSyncTransformerBase { public: - TKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) : + TKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peephole) : TypeCtx(typeCtx), KqpCtx(*kqpCtx), - RBO({RuleStage1, RuleStage2}, kqpCtx, typeCtx, config) {} + RBO({RuleStage1, RuleStage2}, kqpCtx, typeCtx, config, typeAnnTransformer, peephole) {} // Main method of the transformer IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; @@ -51,7 +51,23 @@ class TKqpNewRBOTransformer : public TSyncTransformerBase { TRuleBasedOptimizer RBO; }; -TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config); +TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer); + +class TKqpRBOCleanupTransformer : public TSyncTransformerBase { + public: + TKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx) : + TypeCtx(typeCtx) + {} + + // Main method of the transformer + IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() override; + + private: + TTypeAnnotationContext& TypeCtx; +}; + +TAutoPtr<IGraphTransformer> CreateKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx); } }
\ No newline at end of file diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 1473fc18515..50f920b9f9d 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -800,6 +800,7 @@ private: outputsCount = 1; } } else { + YQL_CLOG(TRACE, ProviderKqp) << "Stage " << stage.Ptr()->UniqueId() << " type ann kind " << resultType->GetKind(); YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Void, "got " << *resultType); } |