diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-03-10 11:24:10 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-03-10 11:24:10 +0300 |
commit | b052e73ab6731e2c2eb991ee97d107fb9a4900d8 (patch) | |
tree | 4b8d0bb112c93a75af1bcb7c864b1418ecf57741 | |
parent | d32759b7eb812b4a4033cc61e1e98245544b4992 (diff) | |
download | ydb-b052e73ab6731e2c2eb991ee97d107fb9a4900d8.tar.gz |
[yql] Fix Multiple stream errors
YQL-14451
ref:92b474355ed06667f7f337ed1a23ee0a1951ea43
-rw-r--r-- | ydb/library/yql/core/expr_nodes/yql_expr_nodes.json | 7 | ||||
-rw-r--r-- | ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp | 13 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_core.cpp | 17 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.cpp | 21 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_opt_utils.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 5 |
6 files changed, 33 insertions, 31 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 74326b7339..eb63fbc11e 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1800,8 +1800,11 @@ }, { "Name": "TCoToFlow", - "Base": "TCoInputBase", - "Match": {"Type": "Callable", "Name": "ToFlow"} + "Base": "TFreeArgCallable", + "Match": {"Type": "Callable", "Name": "ToFlow"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] }, { "Name": "TCoFromFlow", diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index 46195ae0c8..2758a199a2 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -5690,6 +5690,16 @@ TExprNode::TPtr ExpandAggrCompare(const TExprNode::TPtr& node, TExprContext& ctx return node; } +TExprNode::TPtr DropToFlowDeps(const TExprNode::TPtr& node, TExprContext& ctx) { + if (node->ChildrenSize() == 1) { + return node; + } + YQL_CLOG(DEBUG, CorePeepHole) << __FUNCTION__; + auto children = node->ChildrenList(); + children.resize(1); + return ctx.ChangeChildren(*node, std::move(children)); +} + ui64 ToDate(ui64 now) { return std::min<ui64>(NUdf::MAX_DATE - 1U, now / 86400000000ull); } ui64 ToDatetime(ui64 now) { return std::min<ui64>(NUdf::MAX_DATETIME - 1U, now / 1000000ull); } ui64 ToTimestamp(ui64 now) { return std::min<ui64>(NUdf::MAX_TIMESTAMP - 1ULL, now); } @@ -5759,7 +5769,8 @@ struct TPeepHoleRules { {"AggrGreaterOrEqual", &ExpandAggrCompare<false, true>}, {"RangeEmpty", &ExpandRangeEmpty}, {"AsRange", &ExpandAsRange}, - {"RangeFor", &ExpandRangeFor} + {"RangeFor", &ExpandRangeFor}, + {"ToFlow", &DropToFlowDeps} }; static constexpr std::initializer_list<TPeepHoleOptimizerMap::value_type> SimplifyStageRulesInit = { diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index 356414974e..8604422009 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -3120,7 +3120,7 @@ namespace NTypeAnnImpl { } IGraphTransformer::TStatus ToFlowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -3128,8 +3128,21 @@ namespace NTypeAnnImpl { if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &itemType)) { return IGraphTransformer::TStatus::Error; } + const auto kind = input->Head().GetTypeAnn()->GetKind(); + + if (ETypeAnnotationKind::Flow == kind || ETypeAnnotationKind::Stream == kind) { + if (!EnsureMaxArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } else { + for (ui32 i = 1; i < input->ChildrenSize(); ++i) { + if (!EnsureDependsOn(*input->Child(i), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + } + } - if (ETypeAnnotationKind::Flow == input->Head().GetTypeAnn()->GetKind()) { + if (ETypeAnnotationKind::Flow == kind) { output = input->HeadPtr(); return IGraphTransformer::TStatus::Repeat; } diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 8385c239ae..00cba774ae 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1307,27 +1307,6 @@ bool WarnUnroderedSubquery(const TExprNode& unourderedSubquery, TExprContext& ct return ctx.AddWarning(issue); } -TExprNode::TPtr DuplicateIndependentStreams(TExprNode::TPtr lambda, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx) { - TNodeOnNodeOwnedMap replaces; - const auto isStream = [] (const TTypeAnnotationNode* type) { - YQL_ENSURE(type, "Expect type annotated lambda in DuplicateIndependentStreams"); - return type->GetKind() == ETypeAnnotationKind::Stream || type->GetKind() == ETypeAnnotationKind::Flow; - }; - VisitExpr(lambda->TailPtr(), [&](const TExprNode::TPtr& node) { - if (stopTraverse(node.Get())) { - return false; - } - if (auto scope = node->GetDependencyScope(); scope.has_value() && scope.value().second == nullptr - && isStream(node->GetTypeAnn()) && node->IsCallable() && (node->ChildrenSize() == 0 || !isStream(node->Head().GetTypeAnn()))) { - - replaces[node.Get()] = ctx.ShallowCopy(*node); - return false; - } - return true; - }); - return ctx.ReplaceNodes(std::move(lambda), replaces); -} - IGraphTransformer::TStatus LocalUnorderedOptimize(TExprNode::TPtr input, TExprNode::TPtr& output, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx, TTypeAnnotationContext* typeCtx) { output = input; TProcessedNodesSet processedNodes; diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 00635fea81..495f3c702a 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -97,7 +97,6 @@ TExprNode::TPtr OptimizeIfPresent(const TExprNode::TPtr& node, TExprContext& ctx TExprNode::TPtr OptimizeExists(const TExprNode::TPtr& node, TExprContext& ctx); bool WarnUnroderedSubquery(const TExprNode& unourderedSubquery, TExprContext& ctx); -TExprNode::TPtr DuplicateIndependentStreams(TExprNode::TPtr lambda, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx); IGraphTransformer::TStatus LocalUnorderedOptimize(TExprNode::TPtr input, TExprNode::TPtr& output, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx, TTypeAnnotationContext* typeCtx); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index cbe871059d..ed39039ed3 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -199,10 +199,7 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o return {}; } - auto program = TCoLambda(DuplicateIndependentStreams(stage.Program().Ptr(), - [](const TExprNode* node) { - return TDqPhyPrecompute::Match(node) || TDqConnection::Match(node); - }, ctx)); + auto program = stage.Program(); ui32 index = FromString<ui32>(outputIndex.Value()); ui32 branchesCount = GetStageOutputsCount(stage, true); |