aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-03-10 11:24:10 +0300
committerudovichenko-r <udovichenko-r@yandex-team.ru>2022-03-10 11:24:10 +0300
commitb052e73ab6731e2c2eb991ee97d107fb9a4900d8 (patch)
tree4b8d0bb112c93a75af1bcb7c864b1418ecf57741
parentd32759b7eb812b4a4033cc61e1e98245544b4992 (diff)
downloadydb-b052e73ab6731e2c2eb991ee97d107fb9a4900d8.tar.gz
[yql] Fix Multiple stream errors
YQL-14451 ref:92b474355ed06667f7f337ed1a23ee0a1951ea43
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json7
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp13
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp17
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp21
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h1
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp5
6 files changed, 33 insertions, 31 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 74326b7339..eb63fbc11e 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -1800,8 +1800,11 @@
},
{
"Name": "TCoToFlow",
- "Base": "TCoInputBase",
- "Match": {"Type": "Callable", "Name": "ToFlow"}
+ "Base": "TFreeArgCallable",
+ "Match": {"Type": "Callable", "Name": "ToFlow"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"}
+ ]
},
{
"Name": "TCoFromFlow",
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index 46195ae0c8..2758a199a2 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -5690,6 +5690,16 @@ TExprNode::TPtr ExpandAggrCompare(const TExprNode::TPtr& node, TExprContext& ctx
return node;
}
+TExprNode::TPtr DropToFlowDeps(const TExprNode::TPtr& node, TExprContext& ctx) {
+ if (node->ChildrenSize() == 1) {
+ return node;
+ }
+ YQL_CLOG(DEBUG, CorePeepHole) << __FUNCTION__;
+ auto children = node->ChildrenList();
+ children.resize(1);
+ return ctx.ChangeChildren(*node, std::move(children));
+}
+
ui64 ToDate(ui64 now) { return std::min<ui64>(NUdf::MAX_DATE - 1U, now / 86400000000ull); }
ui64 ToDatetime(ui64 now) { return std::min<ui64>(NUdf::MAX_DATETIME - 1U, now / 1000000ull); }
ui64 ToTimestamp(ui64 now) { return std::min<ui64>(NUdf::MAX_TIMESTAMP - 1ULL, now); }
@@ -5759,7 +5769,8 @@ struct TPeepHoleRules {
{"AggrGreaterOrEqual", &ExpandAggrCompare<false, true>},
{"RangeEmpty", &ExpandRangeEmpty},
{"AsRange", &ExpandAsRange},
- {"RangeFor", &ExpandRangeFor}
+ {"RangeFor", &ExpandRangeFor},
+ {"ToFlow", &DropToFlowDeps}
};
static constexpr std::initializer_list<TPeepHoleOptimizerMap::value_type> SimplifyStageRulesInit = {
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index 356414974e..8604422009 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -3120,7 +3120,7 @@ namespace NTypeAnnImpl {
}
IGraphTransformer::TStatus ToFlowWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -3128,8 +3128,21 @@ namespace NTypeAnnImpl {
if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &itemType)) {
return IGraphTransformer::TStatus::Error;
}
+ const auto kind = input->Head().GetTypeAnn()->GetKind();
+
+ if (ETypeAnnotationKind::Flow == kind || ETypeAnnotationKind::Stream == kind) {
+ if (!EnsureMaxArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ } else {
+ for (ui32 i = 1; i < input->ChildrenSize(); ++i) {
+ if (!EnsureDependsOn(*input->Child(i), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+ }
- if (ETypeAnnotationKind::Flow == input->Head().GetTypeAnn()->GetKind()) {
+ if (ETypeAnnotationKind::Flow == kind) {
output = input->HeadPtr();
return IGraphTransformer::TStatus::Repeat;
}
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 8385c239ae..00cba774ae 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1307,27 +1307,6 @@ bool WarnUnroderedSubquery(const TExprNode& unourderedSubquery, TExprContext& ct
return ctx.AddWarning(issue);
}
-TExprNode::TPtr DuplicateIndependentStreams(TExprNode::TPtr lambda, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx) {
- TNodeOnNodeOwnedMap replaces;
- const auto isStream = [] (const TTypeAnnotationNode* type) {
- YQL_ENSURE(type, "Expect type annotated lambda in DuplicateIndependentStreams");
- return type->GetKind() == ETypeAnnotationKind::Stream || type->GetKind() == ETypeAnnotationKind::Flow;
- };
- VisitExpr(lambda->TailPtr(), [&](const TExprNode::TPtr& node) {
- if (stopTraverse(node.Get())) {
- return false;
- }
- if (auto scope = node->GetDependencyScope(); scope.has_value() && scope.value().second == nullptr
- && isStream(node->GetTypeAnn()) && node->IsCallable() && (node->ChildrenSize() == 0 || !isStream(node->Head().GetTypeAnn()))) {
-
- replaces[node.Get()] = ctx.ShallowCopy(*node);
- return false;
- }
- return true;
- });
- return ctx.ReplaceNodes(std::move(lambda), replaces);
-}
-
IGraphTransformer::TStatus LocalUnorderedOptimize(TExprNode::TPtr input, TExprNode::TPtr& output, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx, TTypeAnnotationContext* typeCtx) {
output = input;
TProcessedNodesSet processedNodes;
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index 00635fea81..495f3c702a 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -97,7 +97,6 @@ TExprNode::TPtr OptimizeIfPresent(const TExprNode::TPtr& node, TExprContext& ctx
TExprNode::TPtr OptimizeExists(const TExprNode::TPtr& node, TExprContext& ctx);
bool WarnUnroderedSubquery(const TExprNode& unourderedSubquery, TExprContext& ctx);
-TExprNode::TPtr DuplicateIndependentStreams(TExprNode::TPtr lambda, const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx);
IGraphTransformer::TStatus LocalUnorderedOptimize(TExprNode::TPtr input, TExprNode::TPtr& output,
const std::function<bool(const TExprNode*)>& stopTraverse, TExprContext& ctx, TTypeAnnotationContext* typeCtx);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index cbe871059d..ed39039ed3 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -199,10 +199,7 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o
return {};
}
- auto program = TCoLambda(DuplicateIndependentStreams(stage.Program().Ptr(),
- [](const TExprNode* node) {
- return TDqPhyPrecompute::Match(node) || TDqConnection::Match(node);
- }, ctx));
+ auto program = stage.Program();
ui32 index = FromString<ui32>(outputIndex.Value());
ui32 branchesCount = GetStageOutputsCount(stage, true);