aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman Udovichenko <rvu@ydb.tech>2024-11-28 11:52:07 +0300
committerGitHub <noreply@github.com>2024-11-28 11:52:07 +0300
commit34a26dd332227aa4537d135c19c5a711044ffcfb (patch)
tree1cca470cc794fcb316a48cbc5123782f2c9cd9a2
parentbf6d0558119a704cd8b578f3879e2110bf7e784a (diff)
downloadydb-34a26dd332227aa4537d135c19c5a711044ffcfb.tar.gz
[yt provider] Don't run operation input if sections have nodes to calculate (#11947)
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp23
1 files changed, 21 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
index 0d986df6f7..1cddefb545 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
@@ -94,7 +94,7 @@ public:
TYtMerge::CallableName(),
TYtMapReduce::CallableName(),
},
- RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}),
+ RequireForTransientOp(),
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
);
AddHandler(
@@ -105,7 +105,7 @@ public:
RequireFirst(),
Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>)
);
- AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
+ AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce));
AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass());
AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish));
AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop));
@@ -124,6 +124,21 @@ public:
TExecTransformerBase::Rewind();
}
+ static TExecTransformerBase::TPrerequisite RequireForTransientOp() {
+ return [] (const TExprNode::TPtr& input) {
+ auto status = RequireChild(*input, TYtTransientOpBase::idx_World);
+ // We have to run input only if it has no settings to calculate.
+ // Otherwise, we first of all wait world completion.
+ // Then begins node execution, which run settings calculation.
+ // And after that, starts input execution
+ // See YQL-19303
+ if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) {
+ status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input));
+ }
+ return status;
+ };
+ }
+
private:
static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") {
with_lock(state->StatisticsMutex) {
@@ -190,6 +205,10 @@ private:
return CalculateNodes(State_, input, cluster, needCalc, ctx);
}
+ if (auto opInput = op.Maybe<TYtTransientOpBase>().Input()) {
+ YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete);
+ }
+
auto outSection = op.Output();
size_t outWithoutName = 0;