aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-03-10 14:39:44 +0300
committervvvv <vvvv@ydb.tech>2023-03-10 14:39:44 +0300
commit7c23836fb92f8c9713acb10fb00e286d8a5ed210 (patch)
treebce5caf3398cfdff542722a6e253715ba527d4a3
parent01d78e71ffdf60e161e00c6cd9d2d31bdfbe9075 (diff)
downloadydb-7c23836fb92f8c9713acb10fb00e286d8a5ed210.tar.gz
YQL-15754 owning ptr to async nodes, don't resurrect unreachable nodes
-rw-r--r--ydb/library/yql/core/yql_execution.cpp7
-rw-r--r--ydb/library/yql/core/yql_expr_csee.cpp20
2 files changed, 23 insertions, 4 deletions
diff --git a/ydb/library/yql/core/yql_execution.cpp b/ydb/library/yql/core/yql_execution.cpp
index 1c4fe1b60d..8398d505ab 100644
--- a/ydb/library/yql/core/yql_execution.cpp
+++ b/ydb/library/yql/core/yql_execution.cpp
@@ -131,6 +131,8 @@ public:
}
YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId();
+ auto asyncIt = AsyncNodes.find(item.Node);
+ YQL_ENSURE(asyncIt != AsyncNodes.end());
TExprNode::TPtr callableOutput;
auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
Y_VERIFY(callableOutput);
@@ -157,6 +159,8 @@ public:
{
FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
}
+
+ AsyncNodes.erase(asyncIt);
}
if (!ReplaceNewNodes(output, ctx)) {
@@ -216,6 +220,7 @@ public:
TrackableNodes.clear();
CollectingNodes.clear();
ProvidersCache.clear();
+ AsyncNodes.clear();
}
TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
@@ -460,6 +465,7 @@ public:
Y_UNUSED(ctx);
YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId();
auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node);
+ AsyncNodes[node.Get()] = node;
SubscribeAsyncFuture(node, dataProvider, future);
}
@@ -696,6 +702,7 @@ private:
TExprNode::TListType FreshPendingNodes;
bool DeterministicMode;
+ TNodeOnNodeOwnedMap AsyncNodes;
};
IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited);
diff --git a/ydb/library/yql/core/yql_expr_csee.cpp b/ydb/library/yql/core/yql_expr_csee.cpp
index 8b7b2afd4e..e3cb1337c6 100644
--- a/ydb/library/yql/core/yql_expr_csee.cpp
+++ b/ydb/library/yql/core/yql_expr_csee.cpp
@@ -500,7 +500,8 @@ namespace {
TExprNode::TPtr VisitNode(TExprNode& node, TExprNode* currentLambda, ui16 level,
std::unordered_multimap<ui64, TExprNode*>& uniqueNodes,
std::unordered_multimap<ui64, TExprNode*>& incompleteNodes,
- TNodeMap<TExprNode*>& renames, const TColumnOrderStorage& coStore) {
+ TNodeMap<TExprNode*>& renames, const TColumnOrderStorage& coStore,
+ const TNodeSet& reachable) {
if (node.Type() == TExprNode::Argument) {
return nullptr;
@@ -515,13 +516,13 @@ namespace {
if (node.Type() == TExprNode::Lambda) {
for (ui32 i = 1U; i < node.ChildrenSize(); ++i) {
- if (auto newNode = VisitNode(*node.Child(i), &node, level + 1U, uniqueNodes, incompleteNodes, renames, coStore)) {
+ if (auto newNode = VisitNode(*node.Child(i), &node, level + 1U, uniqueNodes, incompleteNodes, renames, coStore, reachable)) {
node.ChildRef(i) = std::move(newNode);
}
}
} else {
for (ui32 i = 0; i < node.ChildrenSize(); ++i) {
- if (auto newNode = VisitNode(*node.Child(i), currentLambda, level, uniqueNodes, incompleteNodes, renames, coStore)) {
+ if (auto newNode = VisitNode(*node.Child(i), currentLambda, level, uniqueNodes, incompleteNodes, renames, coStore, reachable)) {
node.ChildRef(i) = std::move(newNode);
}
}
@@ -539,6 +540,11 @@ namespace {
continue;
}
+ if (!reachable.contains(iter->second)) {
+ iter = nodesSet.erase(iter);
+ continue;
+ }
+
if (!EqualNodes(node, *iter->second, coStore)) {
#ifndef NDEBUG
if (!GetEnv("YQL_ALLOW_CSEE_HASH_COLLISION")) {
@@ -583,10 +589,16 @@ IGraphTransformer::TStatus EliminateCommonSubExpressions(const TExprNode::TPtr&
{
YQL_PROFILE_SCOPE(DEBUG, forSubGraph ? "EliminateCommonSubExpressionsForSubGraph" : "EliminateCommonSubExpressions");
output = input;
+ TNodeSet reachable;
+ VisitExpr(*output, [&](const TExprNode& node) {
+ reachable.emplace(&node);
+ return true;
+ });
+
TNodeMap<TExprNode*> renames;
//Cerr << "INPUT\n" << output->Dump() << "\n";
std::unordered_multimap<ui64, TExprNode*> incompleteNodes;
- const auto newNode = VisitNode(*output, nullptr, 0, ctx.UniqueNodes, incompleteNodes, renames, coStore);
+ const auto newNode = VisitNode(*output, nullptr, 0, ctx.UniqueNodes, incompleteNodes, renames, coStore, reachable);
YQL_ENSURE(forSubGraph || !newNode);
//Cerr << "OUTPUT\n" << output->Dump() << "\n";
return IGraphTransformer::TStatus::Ok;