aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2022-09-28 20:44:53 +0300
committerudovichenko-r <rvu@ydb.tech>2022-09-28 20:44:53 +0300
commitce67eb7f501987256b2d6fa5dd7dfb79fa5f49c8 (patch)
tree6a6ca586bacc31aa9900382f5a5d6c12ac1feebb
parentb0bfbe84587b999b016e685ac249afa4a0a8adc1 (diff)
downloadydb-ce67eb7f501987256b2d6fa5dd7dfb79fa5f49c8.tar.gz
[dq] Fix handling async file freezing in precompute execution
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp9
1 files changed, 5 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index c5427cdeff1..7126b245ea6 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -1415,6 +1415,7 @@ private:
static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecStatePtr execState) {
return TAsyncTransformCallback([execState](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
output = input;
+ input->SetState(TExprNode::EState::ExecutionRequired);
TStatus combinedStatus = TStatus::Repeat;
TExecState::TQueueType completed;
auto newPromise = NThreading::NewPromise();
@@ -1451,7 +1452,7 @@ private:
for (auto [_, input]: precomputes) {
TString uniqId = TStringBuilder() << input->Content() << "(#" << input->UniqueId() << ')';
YQL_LOG_CTX_SCOPE(uniqId);
- if (input->StartsExecution()) {
+ if (input->GetState() > TExprNode::EState::ExecutionRequired) {
YQL_CLOG(DEBUG, ProviderDq) << "Continue async execution";
combinedStatus = combinedStatus.Combine(TStatus::Async);
continue;
@@ -1464,8 +1465,8 @@ private:
auto publicIds = GetPublicIds(input);
auto optimizedInput = input;
-
- auto status = PeepHole(input, optimizedInput, ctx);
+ optimizedInput->SetState(TExprNode::EState::ConstrComplete);
+ auto status = PeepHole(optimizedInput, optimizedInput, ctx);
if (status.Level != TStatus::Ok) {
return combinedStatus.Combine(status);
}
@@ -1482,7 +1483,7 @@ private:
auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){ return true; }, crutches);
if (filesRes.first.Level != TStatus::Ok) {
- combinedStatus = combinedStatus.Combine(status);
+ combinedStatus = combinedStatus.Combine(filesRes.first);
if (filesRes.first.Level == TStatus::Error) {
return filesRes.first;
}