diff options
author | Roman Udovichenko <rvu@ydb.tech> | 2024-11-28 11:52:07 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-28 11:52:07 +0300 |
commit | 34a26dd332227aa4537d135c19c5a711044ffcfb (patch) | |
tree | 1cca470cc794fcb316a48cbc5123782f2c9cd9a2 | |
parent | bf6d0558119a704cd8b578f3879e2110bf7e784a (diff) | |
download | ydb-34a26dd332227aa4537d135c19c5a711044ffcfb.tar.gz |
[yt provider] Don't run operation input if sections have nodes to calculate (#11947)
-rw-r--r-- | ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp | 23 |
1 files changed, 21 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 0d986df6f7..1cddefb545 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -94,7 +94,7 @@ public: TYtMerge::CallableName(), TYtMapReduce::CallableName(), }, - RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), + RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>) ); AddHandler( @@ -105,7 +105,7 @@ public: RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleOutputOp<true>) ); - AddHandler({TYtReduce::CallableName()}, RequireAllOf({TYtTransientOpBase::idx_World, TYtTransientOpBase::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandleReduce)); + AddHandler({TYtReduce::CallableName()}, RequireForTransientOp(), Hndl(&TYtDataSinkExecTransformer::HandleReduce)); AddHandler({TYtOutput::CallableName()}, RequireFirst(), Pass()); AddHandler({TYtPublish::CallableName()}, RequireAllOf({TYtPublish::idx_World, TYtPublish::idx_Input}), Hndl(&TYtDataSinkExecTransformer::HandlePublish)); AddHandler({TYtDropTable::CallableName()}, RequireFirst(), Hndl(&TYtDataSinkExecTransformer::HandleDrop)); @@ -124,6 +124,21 @@ public: TExecTransformerBase::Rewind(); } + static TExecTransformerBase::TPrerequisite RequireForTransientOp() { + return [] (const TExprNode::TPtr& input) { + auto status = RequireChild(*input, TYtTransientOpBase::idx_World); + // We have to run input only if it has no settings to calculate. + // Otherwise, we first of all wait world completion. + // Then begins node execution, which run settings calculation. + // And after that, starts input execution + // See YQL-19303 + if (!HasNodesToCalculate(input->ChildPtr(TYtTransientOpBase::idx_Input))) { + status = status.Combine(RequireChild(*input, TYtTransientOpBase::idx_Input)); + } + return status; + }; + } + private: static void PushHybridStats(const TYtState::TPtr& state, TStringBuf statName, TStringBuf opName, const TStringBuf& folderName = "") { with_lock(state->StatisticsMutex) { @@ -190,6 +205,10 @@ private: return CalculateNodes(State_, input, cluster, needCalc, ctx); } + if (auto opInput = op.Maybe<TYtTransientOpBase>().Input()) { + YQL_ENSURE(opInput.Ref().GetState() == TExprNode::EState::ExecutionComplete); + } + auto outSection = op.Output(); size_t outWithoutName = 0; |