aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-02-10 16:34:41 +0300
committervvvv <vvvv@ydb.tech>2023-02-10 16:34:41 +0300
commitd1bb6cba6e714ede15b979cf7e5fe384551d3488 (patch)
treea7fec8f3226868b3c18d4b50696d5a425e3c6e6d
parent45f5cd6f4734b7204a9186f9b476e30dbfa839ad (diff)
downloadydb-d1bb6cba6e714ede15b979cf7e5fe384551d3488.tar.gz
support of blocks in DqPhyLength
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp11
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_peephole.cpp31
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_peephole.h3
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.cpp16
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.h4
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp4
6 files changed, 51 insertions, 18 deletions
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
index 0f2516dc747..61fd2288539 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
@@ -85,8 +85,9 @@ TStatus ReplaceNonDetFunctionsWithParams(TExprNode::TPtr& input, TExprContext& c
class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
public:
- TKqpPeepholeTransformer()
+ TKqpPeepholeTransformer(TTypeAnnotationContext& typesCtx)
: TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
+ , TypesCtx(typesCtx)
{
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
@@ -137,10 +138,13 @@ protected:
}
TMaybeNode<TExprBase> RewriteLength(TExprBase node, TExprContext& ctx) {
- TExprBase output = DqPeepholeRewriteLength(node, ctx);
+ TExprBase output = DqPeepholeRewriteLength(node, ctx, TypesCtx);
DumpAppliedRule("RewriteLength", node.Ptr(), output.Ptr(), ctx);
return output;
}
+
+private:
+ TTypeAnnotationContext& TypesCtx;
};
struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
@@ -155,8 +159,7 @@ struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
}
void AfterOptimize(TTransformationPipeline* pipeline) const override {
-
- pipeline->Add(new TKqpPeepholeTransformer(), "KqpPeephole");
+ pipeline->Add(new TKqpPeepholeTransformer(*pipeline->GetTypeAnnotationContext()), "KqpPeephole");
}
private:
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
index 4d2fe490757..651882897ec 100644
--- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
@@ -672,12 +672,41 @@ NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExp
return NNodes::TExprBase(ctx.ChangeChildren(node.Ref(), std::move(children)));
}
-NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprContext& ctx) {
+NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
if (!node.Maybe<TDqPhyLength>()) {
return node;
}
auto dqPhyLength = node.Cast<TDqPhyLength>();
+ if (typesCtx.UseBlocks) {
+ return NNodes::TExprBase(ctx.Builder(node.Pos())
+ .Callable("NarrowMap")
+ .Callable(0, "BlockCombineAll")
+ .Callable(0, "WideToBlocks")
+ .Add(0, MakeExpandMap(node.Pos(), {}, dqPhyLength.Input().Ptr(), ctx))
+ .Seal()
+ .Callable(1, "Void")
+ .Seal()
+ .List(2)
+ .List(0)
+ .Callable(0, "AggBlockApply")
+ .Atom(0, "count_all")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Lambda(1)
+ .Param("value")
+ .Callable("AsStruct")
+ .List(0)
+ .Atom(0, dqPhyLength.Name())
+ .Arg(1, "value")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build());
+ }
return Build<TCoCondense>(ctx, node.Pos())
.Input(dqPhyLength.Input())
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.h b/ydb/library/yql/dq/opt/dq_opt_peephole.h
index e2bf494c0e3..a8951dd12e8 100644
--- a/ydb/library/yql/dq/opt/dq_opt_peephole.h
+++ b/ydb/library/yql/dq/opt/dq_opt_peephole.h
@@ -4,6 +4,7 @@
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
namespace NYql::NDq {
@@ -13,6 +14,6 @@ NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprC
NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx);
-NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprContext& ctx);
+NNodes::TExprBase DqPeepholeRewriteLength(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
index 96684c27cee..8cba4d0601a 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
@@ -45,12 +45,12 @@ namespace NYql::NDqs {
using TStatus = IGraphTransformer::TStatus;
- THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer() {
- return CreateFunctorTransformer([](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer(TTypeAnnotationContext& typesCtx) {
+ return CreateFunctorTransformer([&typesCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
TOptimizeExprSettings optSettings{nullptr};
optSettings.VisitLambdas = true;
return OptimizeExprEx(input, output,
- [](const TExprNode::TPtr& inputExpr, TExprContext& ctx, IOptimizationContext&) {
+ [&typesCtx](const TExprNode::TPtr& inputExpr, TExprContext& ctx, IOptimizationContext&) {
TExprBase node{inputExpr};
PERFORM_RULE(DqPeepholeRewriteCrossJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteJoinDict, node, ctx);
@@ -58,15 +58,15 @@ namespace NYql::NDqs {
PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx);
PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx);
- PERFORM_RULE(DqPeepholeRewriteLength, node, ctx);
+ PERFORM_RULE(DqPeepholeRewriteLength, node, ctx, typesCtx);
return inputExpr;
}, ctx, optSettings);
});
}
- THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) {
- return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
- TOptimizeExprSettings settings(typesCtx);
+ THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext& typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) {
+ return CreateFunctorTransformer([&typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
+ TOptimizeExprSettings settings(&typesCtx);
settings.VisitChecker = [&](const TExprNode& node) {
return input.Get() == &node || (!TDqReadWrapBase::Match(&node) && !TDqPhyPrecompute::Match(&node));
};
@@ -159,7 +159,7 @@ namespace NYql::NDqs {
}
replaces[node.Get()] = NCommon::ValueToExprLiteral(node->GetTypeAnn(), *value, ctx, node->Pos());
}
- TOptimizeExprSettings settings(typesCtx);
+ TOptimizeExprSettings settings(&typesCtx);
settings.VisitStarted = true;
YQL_CLOG(DEBUG, ProviderDq) << "DqsReplacePrecomputes";
return RemapExpr(output, output, replaces, ctx, settings);
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.h b/ydb/library/yql/providers/dq/opt/dqs_opt.h
index e2a7a31e35b..9beec287f07 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.h
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.h
@@ -14,7 +14,7 @@ namespace NYql::NDqs {
class TDatabaseManager;
THolder<IGraphTransformer> CreateDqsFinalizingOptTransformer();
- THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer();
- THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry);
+ THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer(TTypeAnnotationContext& typesCtx);
+ THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext& typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 1b1ae7e9b12..5fa3ef59073 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -229,9 +229,9 @@ private:
void AfterCreate(TTransformationPipeline*) const final {}
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
- pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(pipeline->GetTypeAnnotationContext().Get(), State_->FunctionRegistry), "ReplacePrecomputes");
+ pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(*pipeline->GetTypeAnnotationContext(), State_->FunctionRegistry), "ReplacePrecomputes");
pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "BuildPhy");
- pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(), "RewritePhyCallables");
+ pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(*pipeline->GetTypeAnnotationContext()), "RewritePhyCallables");
}
void AfterOptimize(TTransformationPipeline*) const final {}