diff options
author | vvvv <vvvv@ydb.tech> | 2023-03-10 14:39:44 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-03-10 14:39:44 +0300 |
commit | 7c23836fb92f8c9713acb10fb00e286d8a5ed210 (patch) | |
tree | bce5caf3398cfdff542722a6e253715ba527d4a3 | |
parent | 01d78e71ffdf60e161e00c6cd9d2d31bdfbe9075 (diff) | |
download | ydb-7c23836fb92f8c9713acb10fb00e286d8a5ed210.tar.gz |
YQL-15754 owning ptr to async nodes, don't resurrect unreachable nodes
-rw-r--r-- | ydb/library/yql/core/yql_execution.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/core/yql_expr_csee.cpp | 20 |
2 files changed, 23 insertions, 4 deletions
diff --git a/ydb/library/yql/core/yql_execution.cpp b/ydb/library/yql/core/yql_execution.cpp index 1c4fe1b60d..8398d505ab 100644 --- a/ydb/library/yql/core/yql_execution.cpp +++ b/ydb/library/yql/core/yql_execution.cpp @@ -131,6 +131,8 @@ public: } YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId(); + auto asyncIt = AsyncNodes.find(item.Node); + YQL_ENSURE(asyncIt != AsyncNodes.end()); TExprNode::TPtr callableOutput; auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx); Y_VERIFY(callableOutput); @@ -157,6 +159,8 @@ public: { FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput); } + + AsyncNodes.erase(asyncIt); } if (!ReplaceNewNodes(output, ctx)) { @@ -216,6 +220,7 @@ public: TrackableNodes.clear(); CollectingNodes.clear(); ProvidersCache.clear(); + AsyncNodes.clear(); } TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) { @@ -460,6 +465,7 @@ public: Y_UNUSED(ctx); YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId(); auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node); + AsyncNodes[node.Get()] = node; SubscribeAsyncFuture(node, dataProvider, future); } @@ -696,6 +702,7 @@ private: TExprNode::TListType FreshPendingNodes; bool DeterministicMode; + TNodeOnNodeOwnedMap AsyncNodes; }; IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited); diff --git a/ydb/library/yql/core/yql_expr_csee.cpp b/ydb/library/yql/core/yql_expr_csee.cpp index 8b7b2afd4e..e3cb1337c6 100644 --- a/ydb/library/yql/core/yql_expr_csee.cpp +++ b/ydb/library/yql/core/yql_expr_csee.cpp @@ -500,7 +500,8 @@ namespace { TExprNode::TPtr VisitNode(TExprNode& node, TExprNode* currentLambda, ui16 level, std::unordered_multimap<ui64, TExprNode*>& uniqueNodes, std::unordered_multimap<ui64, TExprNode*>& incompleteNodes, - TNodeMap<TExprNode*>& renames, const TColumnOrderStorage& coStore) { + TNodeMap<TExprNode*>& renames, const TColumnOrderStorage& coStore, + const TNodeSet& reachable) { if (node.Type() == TExprNode::Argument) { return nullptr; @@ -515,13 +516,13 @@ namespace { if (node.Type() == TExprNode::Lambda) { for (ui32 i = 1U; i < node.ChildrenSize(); ++i) { - if (auto newNode = VisitNode(*node.Child(i), &node, level + 1U, uniqueNodes, incompleteNodes, renames, coStore)) { + if (auto newNode = VisitNode(*node.Child(i), &node, level + 1U, uniqueNodes, incompleteNodes, renames, coStore, reachable)) { node.ChildRef(i) = std::move(newNode); } } } else { for (ui32 i = 0; i < node.ChildrenSize(); ++i) { - if (auto newNode = VisitNode(*node.Child(i), currentLambda, level, uniqueNodes, incompleteNodes, renames, coStore)) { + if (auto newNode = VisitNode(*node.Child(i), currentLambda, level, uniqueNodes, incompleteNodes, renames, coStore, reachable)) { node.ChildRef(i) = std::move(newNode); } } @@ -539,6 +540,11 @@ namespace { continue; } + if (!reachable.contains(iter->second)) { + iter = nodesSet.erase(iter); + continue; + } + if (!EqualNodes(node, *iter->second, coStore)) { #ifndef NDEBUG if (!GetEnv("YQL_ALLOW_CSEE_HASH_COLLISION")) { @@ -583,10 +589,16 @@ IGraphTransformer::TStatus EliminateCommonSubExpressions(const TExprNode::TPtr& { YQL_PROFILE_SCOPE(DEBUG, forSubGraph ? "EliminateCommonSubExpressionsForSubGraph" : "EliminateCommonSubExpressions"); output = input; + TNodeSet reachable; + VisitExpr(*output, [&](const TExprNode& node) { + reachable.emplace(&node); + return true; + }); + TNodeMap<TExprNode*> renames; //Cerr << "INPUT\n" << output->Dump() << "\n"; std::unordered_multimap<ui64, TExprNode*> incompleteNodes; - const auto newNode = VisitNode(*output, nullptr, 0, ctx.UniqueNodes, incompleteNodes, renames, coStore); + const auto newNode = VisitNode(*output, nullptr, 0, ctx.UniqueNodes, incompleteNodes, renames, coStore, reachable); YQL_ENSURE(forSubGraph || !newNode); //Cerr << "OUTPUT\n" << output->Dump() << "\n"; return IGraphTransformer::TStatus::Ok; |