aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavel Velikhov <pavelvelikhov@ydb.tech>2025-05-30 16:48:51 +0300
committerGitHub <noreply@github.com>2025-05-30 13:48:51 +0000
commit7b288ef1f52ea97c71014dac8ee95bbde82bd6e3 (patch)
tree90045d9c1589b38593bf6d3ea10b9af92ecdeb6b
parent657d1b4471360d0182f947e1e56d6a483f49c540 (diff)
downloadydb-7b288ef1f52ea97c71014dac8ee95bbde82bd6e3.tar.gz
Generate final physical plan from rule based optimizer (#19028)
Co-authored-by: Pavel Velikhov <pavelvelikhov@localhost.localdomain>
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp44
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp16
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp54
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.h4
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp507
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_operator.cpp32
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_operator.h18
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_rbo.cpp2
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_rbo.h12
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp77
-rw-r--r--ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h22
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp1
12 files changed, 646 insertions, 143 deletions
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index d6f0ce370f4..e75f803943a 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -334,7 +334,21 @@ private:
.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
.Build(false));
- auto newRBOPhysicalOptimizeTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
+ auto kqpTypeAnnTransformer = TTransformationPipeline(typesCtx)
+ .AddServiceTransformers()
+ .Add(Log("RBOTypeAnnotator"), "LogRBOTypeAnnotator")
+ .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
+ .Build(false);
+
+ auto newRBOPhysicalPeepholeTransformer = TTransformationPipeline(typesCtx)
+ .AddServiceTransformers()
+ .Add(Log("PhysicalPeephole"), "LogPhysicalPeephole")
+ .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
+ .AddPostTypeAnnotation()
+ .Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole")
+ .Build(false);
+
+ auto newRBOPhysicalOptimizeTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("NewRBOPhysicalOptimize"), "LogNewRBOPhysicalOptimize")
.AddPreTypeAnnotation()
@@ -342,19 +356,20 @@ private:
.AddIOAnnotation()
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(),
*typesCtx, Config))
- .Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery")
+ //.Add(CreateKqpCheckQueryTransformer(), "CheckKqlQuery")
.AddPostTypeAnnotation(/* forSubgraph */ true)
- //.AddCommonOptimization()
+ //.AddCommonOptimization()s
.Add(CreateKqpPgRewriteTransformer(OptimizeCtx, *typesCtx), "RewritePgSelect")
- .Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, Config), "NewRBOTransformer")
+ .Add(CreateKqpNewRBOTransformer(OptimizeCtx, *typesCtx, Config, kqpTypeAnnTransformer, newRBOPhysicalPeepholeTransformer), "NewRBOTransformer")
+ .Add(CreateKqpRBOCleanupTransformer(*typesCtx), "RBOCleanupTransformer")
//.Add(CreatePhysicalDataProposalsInspector(*typesCtx), "ProvidersPhysicalOptimize")
//.Add(CreateKqpFinalizingOptTransformer(OptimizeCtx), "FinalizingOptimize")
//.Add(CreateKqpQueryPhasesTransformer(), "QueryPhases")
//.Add(CreateKqpQueryEffectsTransformer(OptimizeCtx), "QueryEffects")
//.Add(CreateKqpCheckPhysicalQueryTransformer(), "CheckKqlPhysicalQuery")
- .Build(false));
+ .Build(false);
auto physicalBuildTxsTransformer = CreateKqpQueryBlocksTransformer(TTransformationPipeline(typesCtx)
.AddServiceTransformers()
@@ -421,19 +436,6 @@ private:
*typesCtx), *typesCtx, Config), "Peephole")
.Build(false);
- auto newRBOPhysicalPeepholeTransformer = TTransformationPipeline(typesCtx)
- .AddServiceTransformers()
- .Add(Log("PhysicalPeephole"), "LogPhysicalPeephole")
- .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
- .AddPostTypeAnnotation()
- .Add(GetDqIntegrationPeepholeTransformer(false, typesCtx), "DqIntegrationPeephole")
- .Add(
- CreateKqpTxsPeepholeTransformer(
- CreateTypeAnnotationTransformer(
- CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config),
- *typesCtx), *typesCtx, Config), "Peephole")
- .Build(false);
-
TAutoPtr<IGraphTransformer> compilePhysicalQuery(new TCompilePhysicalQueryTransformer(Cluster,
*TransformCtx,
*OptimizeCtx,
@@ -452,15 +454,19 @@ private:
{
TTransformStage{ newRBOPhysicalOptimizeTransformer, "NewRBOPhysicalOptimize", TIssuesIds::DEFAULT_ERROR },
LogStage("NewRBOPhysicalOptimize"),
+ /*
TTransformStage{ newRBOPhysicalBuildTxsTransformer, "NewRBOPhysicalBuildTxs", TIssuesIds::DEFAULT_ERROR },
LogStage("NewRBOPhysicalBuildTxs"),
TTransformStage{ newRBOPhysicalBuildQueryTransformer, "NewRBOPhysicalBuildQuery", TIssuesIds::DEFAULT_ERROR },
LogStage("NewRBOPhysicalBuildQuery"),
+ */
TTransformStage{ CreateSaveExplainTransformerInput(*TransformCtx), "NewRBOSaveExplainTransformerInput", TIssuesIds::DEFAULT_ERROR },
+ /*
TTransformStage{ newRBOPhysicalPeepholeTransformer, "NewRBOPhysicalPeephole", TIssuesIds::DEFAULT_ERROR },
LogStage("NewRBOPhysicalPeephole"),
+ */
TTransformStage{ newRBOCompilePhysicalQuery, "CompilePhysicalQuery", TIssuesIds::DEFAULT_ERROR },
- TTransformStage{ newRBOPreparedExplainTransformer, "NewRBOExplainQuery", TIssuesIds::DEFAULT_ERROR }, // TODO(sk): only on stats mode or if explain-only
+ //TTransformStage{ newRBOPreparedExplainTransformer, "NewRBOExplainQuery", TIssuesIds::DEFAULT_ERROR }, // TODO(sk): only on stats mode or if explain-only
},
false
);
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index d3d8920615b..5e4a89141f7 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1430,14 +1430,20 @@ TStatus AnnotateKqpPhysicalTx(const TExprNode::TPtr& node, TExprContext& ctx) {
return TStatus::Ok;
}
-TStatus AnnotateKqpPhysicalQuery(const TExprNode::TPtr& node, TExprContext& ctx) {
+TStatus AnnotateKqpPhysicalQuery(const TExprNode::TPtr& node, TExprContext& ctx, bool enableRBO) {
if (!EnsureArgsCount(*node, 3, ctx)) {
return TStatus::Error;
}
- // TODO: ???
-
- node->SetTypeAnn(ctx.MakeType<TVoidExprType>());
+ // We need to infer the type of physical query for RBO at this time
+ if (enableRBO) {
+ TKqpPhysicalQuery query(node);
+ auto type = query.Results().Item(0).Ptr()->GetTypeAnn();
+ node->SetTypeAnn(type);
+ }
+ else {
+ node->SetTypeAnn(ctx.MakeType<TVoidExprType>());
+ }
return TStatus::Ok;
}
@@ -2207,7 +2213,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
}
if (TKqpPhysicalQuery::Match(input.Get())) {
- return AnnotateKqpPhysicalQuery(input, ctx);
+ return AnnotateKqpPhysicalQuery(input, ctx, config->EnableNewRBO);
}
if (TKqpEffects::Match(input.Get())) {
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
index 4e853906c32..ca3132ce128 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
@@ -310,33 +310,6 @@ bool CanPropagateWideBlockThroughChannel(
return true;
}
-TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
- IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
- bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
-{
- TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts);
- TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config);
- TPeepholeSettings peepholeSettings;
- peepholeSettings.CommonConfig = &kqpPeephole;
- peepholeSettings.FinalConfig = &kqpPeepholeFinal;
- peepholeSettings.WithFinalStageRules = withFinalStageRules;
- peepholeSettings.WithNonDeterministicRules = false;
-
- bool hasNonDeterministicFunctions;
- auto status = PeepHoleOptimizeNode(program.Ptr(), newProgram, ctx, typesCtx, &typeAnnTransformer,
- hasNonDeterministicFunctions, peepholeSettings);
- if (status == TStatus::Error) {
- return status;
- }
-
- if (!allowNonDeterministicFunctions && hasNonDeterministicFunctions) {
- ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Unexpected non-deterministic functions in KQP program"));
- return TStatus::Error;
- }
-
- return status;
-}
-
TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprContext& ctx,
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
@@ -623,6 +596,33 @@ private:
} // anonymous namespace
+TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
+ IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
+ bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
+{
+ TKqpPeepholePipelineConfigurator kqpPeephole(config, disabledOpts);
+ TKqpPeepholePipelineFinalConfigurator kqpPeepholeFinal(config);
+ TPeepholeSettings peepholeSettings;
+ peepholeSettings.CommonConfig = &kqpPeephole;
+ peepholeSettings.FinalConfig = &kqpPeepholeFinal;
+ peepholeSettings.WithFinalStageRules = withFinalStageRules;
+ peepholeSettings.WithNonDeterministicRules = false;
+
+ bool hasNonDeterministicFunctions;
+ auto status = PeepHoleOptimizeNode(program.Ptr(), newProgram, ctx, typesCtx, &typeAnnTransformer,
+ hasNonDeterministicFunctions, peepholeSettings);
+ if (status == TStatus::Error) {
+ return status;
+ }
+
+ if (!allowNonDeterministicFunctions && hasNonDeterministicFunctions) {
+ ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Unexpected non-deterministic functions in KQP program"));
+ return TStatus::Error;
+ }
+
+ return status;
+}
+
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
NYql::IGraphTransformer* typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h
index 2c489d58eaf..a03eb3f382a 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.h
@@ -18,4 +18,8 @@ TAutoPtr<NYql::IGraphTransformer> CreateKqpTxsPeepholeTransformer(
const NYql::TKikimrConfiguration::TPtr& config
);
+NYql::IGraphTransformer::TStatus PeepHoleOptimize(const NYql::NNodes::TExprBase& program, NYql::TExprNode::TPtr& newProgram, NYql::TExprContext& ctx,
+ NYql::IGraphTransformer& typeAnnTransformer, NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config,
+ bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts);
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp b/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp
index b8584527aac..41259d25812 100644
--- a/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp
+++ b/ydb/core/kqp/opt/rbo/kqp_convert_to_physical.cpp
@@ -1,4 +1,9 @@
#include "kqp_rbo.h"
+
+#include <yql/essentials/core/yql_graph_transformer.h>
+#include <ydb/library/yql/dq/opt/dq_opt_peephole.h>
+#include <ydb/core/kqp/opt/peephole/kqp_opt_peephole.h>
+
#include <yql/essentials/utils/log/log.h>
using namespace NYql::NNodes;
@@ -43,12 +48,328 @@ namespace {
}
}
+ [[maybe_unused]]
+ TExprNode::TPtr ExtractMembers(TExprNode::TPtr input, TExprContext& ctx, TVector<TInfoUnit> members) {
+ TVector<TExprBase> items;
+ auto arg = Build<TCoArgument>(ctx, input->Pos()).Name("arg").Done().Ptr();
+
+ for (auto iu : members) {
+ auto name = iu.GetFullName();
+ auto tuple = Build<TCoNameValueTuple>(ctx, input->Pos())
+ .Name().Build(name)
+ .Value<TCoMember>()
+ .Struct(arg)
+ .Name().Build(name)
+ .Build()
+ .Done();
+
+ items.push_back(tuple);
+ }
+
+ return Build<TCoFlatMap>(ctx, input->Pos())
+ .Input(input)
+ .Lambda<TCoLambda>()
+ .Args({arg})
+ .Body<TCoJust>()
+ .Input<TCoAsStruct>()
+ .Add(items)
+ .Build()
+ .Build()
+ .Build()
+ .Done().Ptr();
+ }
+
+ [[maybe_unused]]
+ TExprNode::TPtr PeepholeStageLambda(TExprNode::TPtr stageLambda,
+ TVector<TExprNode::TPtr> inputs,
+ TExprContext& ctx,
+ TTypeAnnotationContext& types,
+ TAutoPtr<IGraphTransformer> typeAnnTransformer,
+ TAutoPtr<IGraphTransformer> peepholeTransformer,
+ TKikimrConfiguration::TPtr config) {
+
+ Y_UNUSED(peepholeTransformer);
+
+ // Compute types of inputs to stage lambda
+
+ TVector<const TTypeAnnotationNode*> argTypes;
+
+ for (auto input : inputs) {
+ typeAnnTransformer->Rewind();
+
+ IGraphTransformer::TStatus status(IGraphTransformer::TStatus::Ok);
+ do {
+ status = typeAnnTransformer->Transform(input, input, ctx);
+ } while (status == IGraphTransformer::TStatus::Repeat);
+
+ if (status != IGraphTransformer::TStatus::Ok) {
+ YQL_CLOG(ERROR, ProviderKqp) << "RBO type annotation failed." << Endl << ctx.IssueManager.GetIssues().ToString();
+ return nullptr;
+ }
+
+ argTypes.push_back(input->GetTypeAnn());
+ }
+
+ // Build a temporary KqpProgram to run peephole on
+ auto program = Build<TKqpProgram>(ctx, stageLambda->Pos())
+ .Lambda(stageLambda)
+ .ArgsType(ExpandType(stageLambda->Pos(), *ctx.MakeType<TTupleExprType>(argTypes), ctx))
+ .Done();
+
+ auto programPtr = program.Ptr();
+
+ const bool allowNonDeterministicFunctions = false;
+ TExprNode::TPtr newProgram;
+ auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer.GetRef(), types, config,
+ allowNonDeterministicFunctions, true, {});
+ if (status != IGraphTransformer::TStatus::Ok) {
+ ctx.AddError(TIssue(ctx.GetPosition(program.Pos()), "Peephole optimization failed for KQP transaction"));
+ return {};
+ }
+
+ return TKqpProgram(newProgram).Lambda().Ptr();
+ }
+
+ TExprNode::TPtr BuildCrossJoin(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) {
+
+ TCoArgument leftArg{ctx.NewArgument(pos, "_kqp_left")};
+ TCoArgument rightArg{ctx.NewArgument(pos, "_kqp_right")};
+
+ TVector<TExprNode::TPtr> keys;
+ for (auto iu : join.GetLeftInput()->GetOutputIUs()) {
+ keys.push_back(
+ Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(iu.GetFullName())
+ .Value<TCoMember>()
+ .Struct(leftArg)
+ .Name().Build(iu.GetFullName())
+ .Build()
+ .Done().Ptr());
+ }
+
+ for (auto iu : join.GetRightInput()->GetOutputIUs()) {
+ keys.push_back(
+ Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(iu.GetFullName())
+ .Value<TCoMember>()
+ .Struct(rightArg)
+ .Name().Build(iu.GetFullName())
+ .Build()
+ .Done().Ptr());
+ }
+
+ // we have to `Condense` right input as single-element stream of lists (single list of all elements from the right),
+ // because stream supports single iteration only
+ auto itemArg = Build<TCoArgument>(ctx, pos).Name("item").Done();
+ auto rightAsStreamOfLists = Build<TCoCondense1>(ctx, pos)
+ .Input<TCoToFlow>()
+ .Input(rightInput)
+ .Build()
+ .InitHandler()
+ .Args({itemArg})
+ .Body<TCoAsList>()
+ .Add(itemArg)
+ .Build()
+ .Build()
+ .SwitchHandler()
+ .Args({"item", "state"})
+ .Body<TCoBool>()
+ .Literal().Build("false")
+ .Build()
+ .Build()
+ .UpdateHandler()
+ .Args({"item", "state"})
+ .Body<TCoAppend>()
+ .List("state")
+ .Item("item")
+ .Build()
+ .Build()
+ .Done();
+
+ auto flatMap = Build<TCoFlatMap>(ctx, pos)
+ .Input(rightAsStreamOfLists)
+ .Lambda()
+ .Args({"rightAsList"})
+ .Body<TCoFlatMap>()
+ .Input(leftInput)
+ .Lambda()
+ .Args({leftArg})
+ .Body<TCoMap>()
+ // here we have `List`, so we can iterate over it many times (for every `leftArg`)
+ .Input("rightAsList")
+ .Lambda()
+ .Args({rightArg})
+ .Body<TCoAsStruct>()
+ .Add(keys)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Done().Ptr();
+
+ return Build<TCoFromFlow>(ctx, pos)
+ .Input(flatMap)
+ .Done().Ptr();
+ }
+
+ TExprNode::TPtr ExpandJoinInput(TExprNode::TPtr input, TVector<TInfoUnit> ius, TExprContext& ctx) {
+ return ctx.Builder(input->Pos())
+ .Callable("ExpandMap")
+ .Add(0, input)
+ .Lambda(1)
+ .Param("item")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ auto i = 0U;
+ for (const auto& iu: ius) {
+ parent.Callable(i, "Member")
+ .Arg(0, "item")
+ .Atom(1, iu.GetFullName())
+ .Seal();
+ i++;
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal().Build();
+ }
+
+ TExprNode::TPtr CollapseJoinOutput(TExprNode::TPtr graceJoin, TVector<TInfoUnit> ius, TExprContext& ctx) {
+ return ctx.Builder(graceJoin->Pos())
+ .Callable("NarrowMap")
+ .Add(0, graceJoin)
+ .Lambda(1)
+ .Params("output", ius.size())
+ .Callable("AsStruct")
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ ui32 i = 0U;
+ for (const auto& iu : ius) {
+ parent.List(i)
+ .Atom(0, iu.GetFullName())
+ .Arg(1, "output", i)
+ .Seal();
+ i++;
+ }
+ return parent;
+ })
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ }
+
+ TExprNode::TPtr BuildGraceJoinCore(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) {
+ TVector<TCoAtom> leftColumnIdxs;
+ TVector<TCoAtom> rightColumnIdxs;
+
+ TVector<TCoAtom> leftRenames, rightRenames;
+
+ TVector<TCoAtom> leftKeyColumnNames;
+ TVector<TCoAtom> rightKeyColumnNames;
+
+ auto leftIUs = join.GetLeftInput()->GetOutputIUs();
+ auto rightIUs = join.GetRightInput()->GetOutputIUs();
+
+ auto leftTupleSize = leftIUs.size();
+ for (size_t i=0; i<leftIUs.size(); i++) {
+ leftRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done());
+ leftRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done());
+ }
+
+ for (size_t i=0; i<rightIUs.size(); i++) {
+ rightRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i).Done());
+ rightRenames.push_back(Build<TCoAtom>(ctx, pos).Value(i+leftTupleSize).Done());
+ }
+
+ for (auto k : join.JoinKeys) {
+ auto leftIdx = std::distance(leftIUs.begin(), std::find(leftIUs.begin(), leftIUs.end(), k.first));
+ auto rightIdx = std::distance(rightIUs.begin(), std::find(rightIUs.begin(), rightIUs.end(), k.second));
+
+ leftColumnIdxs.push_back(Build<TCoAtom>(ctx, pos).Value(leftIdx).Done());
+ rightColumnIdxs.push_back(Build<TCoAtom>(ctx, pos).Value(rightIdx).Done());
+
+ leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(k.first.GetFullName()).Done());
+ rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(k.second.GetFullName()).Done());
+ }
+
+ // Convert to wide flow
+
+ leftInput = Build<TCoToFlow>(ctx, pos)
+ .Input(leftInput)
+ .Done().Ptr();
+
+ leftInput = ExpandJoinInput(leftInput, join.GetLeftInput()->GetOutputIUs(), ctx);
+
+ rightInput = Build<TCoToFlow>(ctx, pos)
+ .Input(rightInput)
+ .Done().Ptr();
+
+ rightInput = ExpandJoinInput(rightInput, join.GetRightInput()->GetOutputIUs(), ctx);
+
+ auto graceJoin = Build<TCoGraceJoinCore>(ctx, pos)
+ .LeftInput(leftInput)
+ .RightInput(rightInput)
+ .JoinKind<TCoAtom>().Value("Inner").Build()
+ .LeftKeysColumns<TCoAtomList>().Add(leftColumnIdxs).Build()
+ .RightKeysColumns<TCoAtomList>().Add(rightColumnIdxs).Build()
+ .LeftRenames().Add(leftRenames).Build()
+ .RightRenames().Add(rightRenames).Build()
+ .LeftKeysColumnNames<TCoAtomList>().Add(leftKeyColumnNames).Build()
+ .RightKeysColumnNames<TCoAtomList>().Add(rightKeyColumnNames).Build()
+ .Flags().Build()
+ .Done().Ptr();
+
+ // Convert back to narrow stream
+ return Build<TCoFromFlow>(ctx, pos)
+ .Input(CollapseJoinOutput(graceJoin, join.GetOutputIUs(), ctx))
+ .Done().Ptr();
+ }
+
+ [[maybe_unused]]
+ TExprNode::TPtr BuildDqGraceJoin(TOpJoin & join, TExprNode::TPtr leftInput, TExprNode::TPtr rightInput, TExprContext& ctx, TPositionHandle pos) {
+ TVector<TDqJoinKeyTuple> joinKeys;
+ TVector<TCoAtom> leftKeyColumnNames;
+ TVector<TCoAtom> rightKeyColumnNames;
+
+ for (auto p : join.JoinKeys) {
+ TString leftFullName = "_alias_" + p.first.Alias + "." + p.first.ColumnName;
+ TString rightFullName = "_alias_" + p.second.Alias + "." + p.second.ColumnName;
+
+ joinKeys.push_back(Build<TDqJoinKeyTuple>(ctx, pos)
+ .LeftLabel().Value("_alias_" + p.first.Alias).Build()
+ .LeftColumn().Value(p.first.ColumnName).Build()
+ .RightLabel().Value("_alias_" + p.second.Alias).Build()
+ .RightColumn().Value(p.second.ColumnName).Build()
+ .Done());
+
+ leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(leftFullName).Done());
+ rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, pos).Value(rightFullName).Done());
+ }
+
+ return Build<TDqPhyGraceJoin>(ctx, pos)
+ .LeftInput(leftInput)
+ .LeftLabel<TCoVoid>().Build()
+ .RightInput(rightInput)
+ .RightLabel<TCoVoid>().Build()
+ .JoinType<TCoAtom>().Value("Inner").Build()
+ .JoinKeys<TDqJoinKeyTupleList>().Add(joinKeys).Build()
+ .LeftJoinKeyNames<TCoAtomList>().Add(leftKeyColumnNames).Build()
+ .RightJoinKeyNames<TCoAtomList>().Add(rightKeyColumnNames).Build()
+ .Done().Ptr();
+ }
+
+
}
namespace NKikimr {
namespace NKqp {
-TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
+TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx, TTypeAnnotationContext& types, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer, TKikimrConfiguration::TPtr config) {
+ Y_UNUSED(types);
+ Y_UNUSED(peepholeTransformer);
+ Y_UNUSED(config);
+
THashMap<int, TExprNode::TPtr> stages;
THashMap<int, TVector<TExprNode::TPtr>> stageArgs;
auto & graph = root.PlanProps.StageGraph;
@@ -112,40 +433,45 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
auto filter = TKqpOpFilter(op->Node);
auto filterBody = filter.Lambda().Body();
- auto arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr();
- auto newFilterBody = ReplaceArg(filterBody.Ptr(), arg, ctx);
+ auto filter_arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr();
+ auto map_arg = Build<TCoArgument>(ctx, root.Node->Pos()).Name("arg").Done().Ptr();
+
+ auto newFilterBody = ReplaceArg(filterBody.Ptr(), filter_arg, ctx);
newFilterBody = ctx.Builder(root.Node->Pos()).Callable("FromPg").Add(0, newFilterBody).Seal().Build();
TVector<TExprBase> items;
for (auto iu : op->GetOutputIUs()) {
- auto memberName = "_alias_" + iu.Alias + "." + iu.ColumnName;
+ auto memberName = iu.GetFullName();
auto tuple = Build<TCoNameValueTuple>(ctx, root.Node->Pos())
.Name().Build(memberName)
.Value<TCoMember>()
- .Struct(arg)
+ .Struct(map_arg)
.Name().Build(memberName)
.Build()
.Done();
items.push_back(tuple);
}
-
- currentStageBody = Build<TCoFlatMap>(ctx, root.Node->Pos())
- .Input(TExprBase(currentStageBody))
- .Lambda<TCoLambda>()
- .Args({arg})
- .Body<TCoOptionalIf>()
- .Predicate<TCoCoalesce>()
+
+ currentStageBody = Build<TCoMap>(ctx, root.Node->Pos())
+ .Input<TCoFilter>()
+ .Input(TExprBase(currentStageBody))
+ .Lambda<TCoLambda>()
+ .Args({filter_arg})
+ .Body<TCoCoalesce>()
.Predicate(newFilterBody)
.Value<TCoBool>()
.Literal().Build("false")
.Build()
.Build()
- .Value<TCoAsStruct>()
- .Add(items)
.Build()
.Build()
+ .Lambda<TCoLambda>()
+ .Args({map_arg})
+ .Body<TCoAsStruct>()
+ .Add(items)
+ .Build()
.Build()
.Done().Ptr();
@@ -175,14 +501,12 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
items.push_back(tuple);
}
- currentStageBody = Build<TCoFlatMap>(ctx, root.Node->Pos())
+ currentStageBody = Build<TCoMap>(ctx, root.Node->Pos())
.Input(TExprBase(currentStageBody))
.Lambda<TCoLambda>()
.Args({arg})
- .Body<TCoJust>()
- .Input<TCoAsStruct>()
- .Add(items)
- .Build()
+ .Body<TCoAsStruct>()
+ .Add(items)
.Build()
.Build()
.Done().Ptr();
@@ -214,51 +538,13 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
stageArgs[opStageId].push_back(rightArg);
if (join->JoinKind == "Cross") {
- currentStageBody = Build<TDqPhyCrossJoin>(ctx, root.Node->Pos())
- .LeftInput(leftInput)
- .LeftLabel<TCoVoid>().Build()
- .RightInput(rightInput)
- .RightLabel<TCoVoid>().Build()
- .JoinType<TCoAtom>().Value("Cross").Build()
- .JoinKeys<TDqJoinKeyTupleList>().Build()
- .LeftJoinKeyNames<TCoAtomList>().Build()
- .RightJoinKeyNames<TCoAtomList>().Build()
- .Done().Ptr();
+ currentStageBody = BuildCrossJoin(*join, leftInput, rightInput, ctx, root.Node->Pos());
}
else if (join->JoinKind == "Inner") {
- TVector<TDqJoinKeyTuple> joinKeys;
- TVector<TCoAtom> leftKeyColumnNames;
- TVector<TCoAtom> rightKeyColumnNames;
-
- for (auto p : join->JoinKeys) {
- TString leftFullName = "_alias_" + p.first.Alias + "." + p.first.ColumnName;
- TString rightFullName = "_alias_" + p.second.Alias + "." + p.second.ColumnName;
-
- joinKeys.push_back(Build<TDqJoinKeyTuple>(ctx, root.Node->Pos())
- .LeftLabel().Value("_alias_" + p.first.Alias).Build()
- .LeftColumn().Value(p.first.ColumnName).Build()
- .RightLabel().Value("_alias_" + p.second.Alias).Build()
- .RightColumn().Value(p.second.ColumnName).Build()
- .Done());
-
- leftKeyColumnNames.push_back(Build<TCoAtom>(ctx, root.Node->Pos()).Value(leftFullName).Done());
- rightKeyColumnNames.push_back(Build<TCoAtom>(ctx, root.Node->Pos()).Value(rightFullName).Done());
- }
-
- currentStageBody = Build<TDqPhyGraceJoin>(ctx, root.Node->Pos())
- .LeftInput(leftInput)
- .LeftLabel<TCoVoid>().Build()
- .RightInput(rightInput)
- .RightLabel<TCoVoid>().Build()
- .JoinType<TCoAtom>().Value("Inner").Build()
- .JoinKeys<TDqJoinKeyTupleList>().Add(joinKeys).Build()
- .LeftJoinKeyNames<TCoAtomList>().Add(leftKeyColumnNames).Build()
- .RightJoinKeyNames<TCoAtomList>().Add(rightKeyColumnNames).Build()
- .Done().Ptr();
- }
+ currentStageBody = BuildGraceJoinCore(*join, leftInput, rightInput, ctx, root.Node->Pos());
+ }
else {
Y_ENSURE(false, "Unsupported join kind");
-
}
stages[opStageId] = currentStageBody;
@@ -271,9 +557,10 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
// We need to build up stages in a topological sort order
graph.TopologicalSort();
-
THashMap<int, TExprNode::TPtr> finalizedStages;
+ TVector<TExprNode::TPtr> txStages;
+
auto stageIds = graph.StageIds;
auto stageInputIds = graph.StageInputs;
@@ -286,7 +573,11 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
auto inputStage = finalizedStages.at(inputStageId);
auto c = graph.GetConnection(inputStageId, id);
YQL_CLOG(TRACE, CoreDq) << "Building connection: " << inputStageId << "->" << id << ", " << c->Type;
- auto conn = c->BuildConnection(inputStage, root.Node, ctx);
+ TExprNode::TPtr newStage;
+ auto conn = c->BuildConnection(inputStage, root.Node, newStage, ctx);
+ if (newStage) {
+ txStages.push_back(newStage);
+ }
YQL_CLOG(TRACE, CoreDq) << "Built connection: " << inputStageId << "->" << id << ", " << c->Type;
inputs.push_back(conn);
}
@@ -295,7 +586,7 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
stage = stages.at(id);
}
else {
- stage = Build<TDqStage>(ctx, root.Node->Pos())
+ stage = Build<TDqPhyStage>(ctx, root.Node->Pos())
.Inputs()
.Add(inputs)
.Build()
@@ -304,7 +595,11 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
.Body(stages.at(id))
.Build()
.Settings().Build()
- .Done().Ptr();
+ .Done()
+ .Ptr();
+
+ txStages.push_back(stage);
+ YQL_CLOG(TRACE, CoreDq) << "Added stage " << stage->UniqueId();
}
finalizedStages[id] = stage;
@@ -314,15 +609,91 @@ TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx) {
// Build a union all for the last stage
int lastStageIdx = stageIds[stageIds.size()-1];
- auto result = Build<TDqCnUnionAll>(ctx, root.Node->Pos())
+ auto lastStage = finalizedStages.at(lastStageIdx);
+
+ // wrap in DqResult
+ auto dqResult = Build<TDqCnResult>(ctx, root.Node->Pos())
.Output()
- .Stage(finalizedStages.at(lastStageIdx))
+ .Stage(lastStage)
.Index().Build("0")
.Build()
- .Done().Ptr();
+ .ColumnHints().Build()
+ .Done()
+ .Ptr();
+
+ TVector<TExprNode::TPtr> txSettings;
+ txSettings.push_back(Build<TCoNameValueTuple>(ctx, root.Node->Pos())
+ .Name().Build("type")
+ .Value<TCoAtom>().Build("compute")
+ .Done()
+ .Ptr());
+ // Build PhysicalTx
+ auto physTx = Build<TKqpPhysicalTx>(ctx, root.Node->Pos())
+ .Stages()
+ .Add(txStages)
+ .Build()
+ .Results()
+ .Add({dqResult})
+ .Build()
+ .ParamBindings().Build()
+ .Settings()
+ .Add(txSettings)
+ .Build()
+ .Done()
+ .Ptr();
+
+ TVector<TExprNode::TPtr> querySettings;
+ querySettings.push_back(Build<TCoNameValueTuple>(ctx, root.Node->Pos())
+ .Name().Build("type")
+ .Value<TCoAtom>().Build("data_query")
+ .Done()
+ .Ptr());
+
+ // Build result type
+
+ typeAnnTransformer->Rewind();
+ IGraphTransformer::TStatus status(IGraphTransformer::TStatus::Ok);
+ do {
+ status = typeAnnTransformer->Transform(dqResult, dqResult, ctx);
+ } while (status == IGraphTransformer::TStatus::Repeat);
+
+ if (status != IGraphTransformer::TStatus::Ok) {
+ YQL_CLOG(ERROR, ProviderKqp) << "RBO type annotation failed." << Endl << ctx.IssueManager.GetIssues().ToString();
+ return nullptr;
+ }
+
+ YQL_CLOG(TRACE, CoreDq) << "Inferred final type: " << *dqResult->GetTypeAnn();
+
+ auto binding = Build<TKqpTxResultBinding>(ctx, root.Node->Pos())
+ .Type(ExpandType(root.Node->Pos(), *dqResult->GetTypeAnn(), ctx))
+ .TxIndex().Build("0")
+ .ResultIndex().Build("0")
+ .Done();
+
+ // Build Physical query
+ auto physQuery = Build<TKqpPhysicalQuery>(ctx, root.Node->Pos())
+ .Transactions()
+ .Add({physTx})
+ .Build()
+ .Results()
+ .Add({binding})
+ .Build()
+ .Settings()
+ .Add(querySettings)
+ .Build()
+ .Done()
+ .Ptr();
+
+ /*
+ typeAnnTransformer->Rewind();
+ do {
+ status = typeAnnTransformer->Transform(physQuery, physQuery, ctx);
+ } while (status == IGraphTransformer::TStatus::Repeat);
+ */
+
YQL_CLOG(TRACE, CoreDq) << "Final plan built";
- return result;
+ return physQuery;
}
}
diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.cpp b/ydb/core/kqp/opt/rbo/kqp_operator.cpp
index a523f3dead9..f8a6aa91d19 100644
--- a/ydb/core/kqp/opt/rbo/kqp_operator.cpp
+++ b/ydb/core/kqp/opt/rbo/kqp_operator.cpp
@@ -73,6 +73,7 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext& ctx, TVector<TIn
items.push_back(tuple);
}
+ /*
return Build<TCoFlatMap>(ctx, input->Pos())
.Input(input)
.Lambda<TCoLambda>()
@@ -84,11 +85,23 @@ TExprNode::TPtr AddRenames(TExprNode::TPtr input, TExprContext& ctx, TVector<TIn
.Build()
.Build()
.Done().Ptr();
+ */
+
+ return Build<TCoMap>(ctx, input->Pos())
+ .Input(input)
+ .Lambda<TCoLambda>()
+ .Args({arg})
+ .Body<TCoAsStruct>()
+ .Add(items)
+ .Build()
+ .Build()
+ .Done().Ptr();
+
}
TExprNode::TPtr BuildSourceStage(TExprNode::TPtr dqsource, TExprContext& ctx) {
auto arg = Build<TCoArgument>(ctx, dqsource->Pos()).Name("arg").Done().Ptr();
- return Build<TDqStage>(ctx, dqsource->Pos())
+ return Build<TDqPhyStage>(ctx, dqsource->Pos())
.Inputs()
.Add({dqsource})
.Build()
@@ -122,13 +135,14 @@ TInfoUnit::TInfoUnit(TString name) {
}
}
-inline bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs) {
+bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs) {
return lhs.Alias == rhs.Alias && lhs.ColumnName == rhs.ColumnName;
}
-TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) {
+TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) {
if (FromSourceStage) {
inputStage = BuildSourceStage(inputStage, ctx);
+ newStage = inputStage;
}
return Build<TDqCnBroadcast>(ctx, node->Pos())
.Output()
@@ -138,9 +152,10 @@ TExprNode::TPtr TBroadcastConnection::BuildConnection(TExprNode::TPtr inputStage
.Done().Ptr();
}
-TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) {
+TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) {
if (FromSourceStage) {
inputStage = BuildSourceStage(inputStage, ctx);
+ newStage = inputStage;
}
return Build<TDqCnMap>(ctx, node->Pos())
.Output()
@@ -150,9 +165,10 @@ TExprNode::TPtr TMapConnection::BuildConnection(TExprNode::TPtr inputStage, TExp
.Done().Ptr();
}
-TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) {
+TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) {
if (FromSourceStage) {
inputStage = BuildSourceStage(inputStage, ctx);
+ newStage = inputStage;
}
return Build<TDqCnUnionAll>(ctx, node->Pos())
.Output()
@@ -162,11 +178,12 @@ TExprNode::TPtr TUnionAllConnection::BuildConnection(TExprNode::TPtr inputStage,
.Done().Ptr();
}
-TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) {
+TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) {
TVector<TCoAtom> keyColumns;
if (FromSourceStage) {
inputStage = BuildSourceStage(inputStage, ctx);
+ newStage = inputStage;
}
for ( auto k : Keys ) {
@@ -191,8 +208,9 @@ TExprNode::TPtr TShuffleConnection::BuildConnection(TExprNode::TPtr inputStage,
.Done().Ptr();
}
-TExprNode::TPtr TSourceConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) {
+TExprNode::TPtr TSourceConnection::BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) {
Y_UNUSED(node);
+ Y_UNUSED(newStage);
Y_UNUSED(ctx);
return inputStage;
}
diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.h b/ydb/core/kqp/opt/rbo/kqp_operator.h
index 01647cec3d0..be759b92bde 100644
--- a/ydb/core/kqp/opt/rbo/kqp_operator.h
+++ b/ydb/core/kqp/opt/rbo/kqp_operator.h
@@ -25,6 +25,10 @@ struct TInfoUnit {
TInfoUnit(TString alias, TString column): Alias(alias), ColumnName(column) {}
TInfoUnit(TString name);
+ TString GetFullName() const {
+ return ((Alias!="") ? ("_alias_" + Alias + ".") : "" ) + ColumnName;
+ }
+
TString Alias;
TString ColumnName;
@@ -37,7 +41,7 @@ struct TInfoUnit {
};
};
-inline bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs);
+bool operator == (const TInfoUnit& lhs, const TInfoUnit& rhs);
struct TFilterInfo {
TExprNode::TPtr FilterBody;
@@ -63,7 +67,7 @@ struct TPhysicalOpProps {
struct TConnection {
TConnection(TString type, bool fromSourceStage) : Type(type), FromSourceStage(fromSourceStage) {}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) = 0;
virtual ~TConnection() = default;
TString Type;
@@ -72,19 +76,19 @@ struct TConnection {
struct TBroadcastConnection : public TConnection {
TBroadcastConnection(bool fromSourceStage) : TConnection("Broadcast", fromSourceStage) {}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override;
};
struct TMapConnection : public TConnection {
TMapConnection(bool fromSourceStage) : TConnection("Map", fromSourceStage) {}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override;
};
struct TUnionAllConnection : public TConnection {
TUnionAllConnection(bool fromSourceStage) : TConnection("UnionAll", fromSourceStage) {}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override;
};
@@ -93,14 +97,14 @@ struct TShuffleConnection : public TConnection {
,Keys(keys)
{}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override;
TVector<TInfoUnit> Keys;
};
struct TSourceConnection : public TConnection {
TSourceConnection() : TConnection("Source", true) {}
- virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprContext& ctx) override;
+ virtual TExprNode::TPtr BuildConnection(TExprNode::TPtr inputStage, TExprNode::TPtr & node, TExprNode::TPtr & newStage, TExprContext& ctx) override;
};
diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo.cpp
index bf4e5141e0d..1d4a0bb4183 100644
--- a/ydb/core/kqp/opt/rbo/kqp_rbo.cpp
+++ b/ydb/core/kqp/opt/rbo/kqp_rbo.cpp
@@ -72,7 +72,7 @@ TExprNode::TPtr TRuleBasedOptimizer::Optimize(TOpRoot & root, TExprContext& ctx
}
YQL_CLOG(TRACE, CoreDq) << "New RBO finished, generating physical plan";
- return ConvertToPhysical(root, ctx);
+ return ConvertToPhysical(root, ctx, TypeCtx, TypeAnnTransformer, PeepholeTransformer, Config);
}
}
diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo.h b/ydb/core/kqp/opt/rbo/kqp_rbo.h
index e4b8504f2cf..2673c789209 100644
--- a/ydb/core/kqp/opt/rbo/kqp_rbo.h
+++ b/ydb/core/kqp/opt/rbo/kqp_rbo.h
@@ -55,10 +55,14 @@ class TRuleBasedOptimizer {
TRuleBasedOptimizer(TVector<TRuleBasedStage> stages,
const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
TTypeAnnotationContext& typeCtx,
- const TKikimrConfiguration::TPtr& config) : Stages(stages),
+ const TKikimrConfiguration::TPtr& config,
+ TAutoPtr<IGraphTransformer> typeAnnTransformer,
+ TAutoPtr<IGraphTransformer> peephole) : Stages(stages),
KqpCtx(kqpCtx),
TypeCtx(typeCtx),
- Config(config) {}
+ Config(config),
+ TypeAnnTransformer(typeAnnTransformer),
+ PeepholeTransformer(peephole) {}
TExprNode::TPtr Optimize(TOpRoot & root, TExprContext& ctx);
@@ -66,9 +70,11 @@ class TRuleBasedOptimizer {
const TIntrusivePtr<TKqpOptimizeContext>& KqpCtx;
TTypeAnnotationContext& TypeCtx;
const TKikimrConfiguration::TPtr& Config;
+ TAutoPtr<IGraphTransformer> TypeAnnTransformer;
+ TAutoPtr<IGraphTransformer> PeepholeTransformer;
};
-TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx);
+TExprNode::TPtr ConvertToPhysical(TOpRoot & root, TExprContext& ctx, TTypeAnnotationContext& types, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer, TKikimrConfiguration::TPtr config);
}
} \ No newline at end of file
diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp
index 5847b60b0e7..b17e4fb2940 100644
--- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp
+++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.cpp
@@ -157,7 +157,7 @@ IGraphTransformer::TStatus TKqpPgRewriteTransformer::DoTransform(TExprNode::TPtr
auto status = OptimizeExpr(output, output, [this] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
if (TCoPgSelect::Match(node.Get())) {
return RewritePgSelect(node, ctx, TypeCtx);
- } if (TCoTake::Match(node.Get())) {
+ } else if (TCoTake::Match(node.Get())) {
return PushTakeIntoPlan(node, ctx, TypeCtx);
}
else {
@@ -193,12 +193,83 @@ IGraphTransformer::TStatus TKqpNewRBOTransformer::DoTransform(TExprNode::TPtr in
void TKqpNewRBOTransformer::Rewind() {
}
+IGraphTransformer::TStatus TKqpRBOCleanupTransformer::DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
+ output = input;
+ TOptimizeExprSettings settings(&TypeCtx);
+
+ Y_UNUSED(ctx);
+
+ /*
+ auto status = OptimizeExpr(output, output, [] (const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ Y_UNUSED(ctx);
+ YQL_CLOG(TRACE, CoreDq) << "Checking if node " << node->UniqueId() << " is list: " << node->IsList();
+
+ if (node.Get()->IsList() && node.Get()->ChildrenSize()>=1) {
+ auto child_level_1 = node.Get()->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 0";
+
+ if (child_level_1->IsList() && child_level_1->ChildrenSize()>=1) {
+ auto child_level_2 = child_level_1->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 1";
+
+ if (child_level_2->IsList() && child_level_2->ChildrenSize()>=1) {
+ auto maybeQuery = child_level_2->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 2";
+
+ if (TKqpPhysicalQuery::Match(maybeQuery)) {
+ YQL_CLOG(TRACE, CoreDq) << "Found query node";
+ return maybeQuery;
+ }
+ }
+ }
+ }
+ return node;
+ }, ctx, settings);
+
+ */
+
+ YQL_CLOG(TRACE, CoreDq) << "Cleanup input plan: " << output->Dump();
+
+ if (output->IsList() && output->ChildrenSize()>=1) {
+ auto child_level_1 = output->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 0";
+
+ if (child_level_1->IsList() && child_level_1->ChildrenSize()>=1) {
+ auto child_level_2 = child_level_1->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 1";
+
+ if (child_level_2->IsList() && child_level_2->ChildrenSize()>=1) {
+ auto child_level_3 = child_level_2->Child(0);
+ YQL_CLOG(TRACE, CoreDq) << "Matched level 2";
+
+ if (child_level_3->IsList() && child_level_2->ChildrenSize()>=1) {
+ auto maybeQuery = child_level_3->Child(0);
+
+ if (TKqpPhysicalQuery::Match(maybeQuery)) {
+ YQL_CLOG(TRACE, CoreDq) << "Found query node";
+ output = maybeQuery;
+ }
+ }
+ }
+ }
+ }
+
+ return IGraphTransformer::TStatus::Ok;
+}
+
+void TKqpRBOCleanupTransformer::Rewind() {
+}
+
TAutoPtr<IGraphTransformer> CreateKqpPgRewriteTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx) {
return new TKqpPgRewriteTransformer(kqpCtx, typeCtx);
}
-TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) {
- return new TKqpNewRBOTransformer(kqpCtx, typeCtx, config);
+TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peephole) {
+ return new TKqpNewRBOTransformer(kqpCtx, typeCtx, config, typeAnnTransformer, peephole);
+}
+
+TAutoPtr<IGraphTransformer> CreateKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx) {
+ return new TKqpRBOCleanupTransformer(typeCtx);
}
}
diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h
index fab71400ac1..515cf804117 100644
--- a/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h
+++ b/ydb/core/kqp/opt/rbo/kqp_rbo_transformer.h
@@ -36,10 +36,10 @@ TAutoPtr<IGraphTransformer> CreateKqpPgRewriteTransformer(const TIntrusivePtr<TK
class TKqpNewRBOTransformer : public TSyncTransformerBase {
public:
- TKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config) :
+ TKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peephole) :
TypeCtx(typeCtx),
KqpCtx(*kqpCtx),
- RBO({RuleStage1, RuleStage2}, kqpCtx, typeCtx, config) {}
+ RBO({RuleStage1, RuleStage2}, kqpCtx, typeCtx, config, typeAnnTransformer, peephole) {}
// Main method of the transformer
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
@@ -51,7 +51,23 @@ class TKqpNewRBOTransformer : public TSyncTransformerBase {
TRuleBasedOptimizer RBO;
};
-TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config);
+TAutoPtr<IGraphTransformer> CreateKqpNewRBOTransformer(const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, TTypeAnnotationContext& typeCtx, const TKikimrConfiguration::TPtr& config, TAutoPtr<IGraphTransformer> typeAnnTransformer, TAutoPtr<IGraphTransformer> peepholeTransformer);
+
+class TKqpRBOCleanupTransformer : public TSyncTransformerBase {
+ public:
+ TKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx) :
+ TypeCtx(typeCtx)
+ {}
+
+ // Main method of the transformer
+ IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;
+ void Rewind() override;
+
+ private:
+ TTypeAnnotationContext& TypeCtx;
+};
+
+TAutoPtr<IGraphTransformer> CreateKqpRBOCleanupTransformer(TTypeAnnotationContext& typeCtx);
}
} \ No newline at end of file
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 1473fc18515..50f920b9f9d 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -800,6 +800,7 @@ private:
outputsCount = 1;
}
} else {
+ YQL_CLOG(TRACE, ProviderKqp) << "Stage " << stage.Ptr()->UniqueId() << " type ann kind " << resultType->GetKind();
YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Void, "got " << *resultType);
}