diff options
author | udovichenko-r <rvu@ydb.tech> | 2022-09-28 20:44:53 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2022-09-28 20:44:53 +0300 |
commit | ce67eb7f501987256b2d6fa5dd7dfb79fa5f49c8 (patch) | |
tree | 6a6ca586bacc31aa9900382f5a5d6c12ac1feebb | |
parent | b0bfbe84587b999b016e685ac249afa4a0a8adc1 (diff) | |
download | ydb-ce67eb7f501987256b2d6fa5dd7dfb79fa5f49c8.tar.gz |
[dq] Fix handling async file freezing in precompute execution
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index c5427cdeff1..7126b245ea6 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -1415,6 +1415,7 @@ private: static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecStatePtr execState) { return TAsyncTransformCallback([execState](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { output = input; + input->SetState(TExprNode::EState::ExecutionRequired); TStatus combinedStatus = TStatus::Repeat; TExecState::TQueueType completed; auto newPromise = NThreading::NewPromise(); @@ -1451,7 +1452,7 @@ private: for (auto [_, input]: precomputes) { TString uniqId = TStringBuilder() << input->Content() << "(#" << input->UniqueId() << ')'; YQL_LOG_CTX_SCOPE(uniqId); - if (input->StartsExecution()) { + if (input->GetState() > TExprNode::EState::ExecutionRequired) { YQL_CLOG(DEBUG, ProviderDq) << "Continue async execution"; combinedStatus = combinedStatus.Combine(TStatus::Async); continue; @@ -1464,8 +1465,8 @@ private: auto publicIds = GetPublicIds(input); auto optimizedInput = input; - - auto status = PeepHole(input, optimizedInput, ctx); + optimizedInput->SetState(TExprNode::EState::ConstrComplete); + auto status = PeepHole(optimizedInput, optimizedInput, ctx); if (status.Level != TStatus::Ok) { return combinedStatus.Combine(status); } @@ -1482,7 +1483,7 @@ private: auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){ return true; }, crutches); if (filesRes.first.Level != TStatus::Ok) { - combinedStatus = combinedStatus.Combine(status); + combinedStatus = combinedStatus.Combine(filesRes.first); if (filesRes.first.Level == TStatus::Error) { return filesRes.first; } |