diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-13 17:59:31 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-13 18:14:05 +0300 |
commit | f4df711d7e78fb95059b084b527b9003354549e2 (patch) | |
tree | fc82c7de5ae2fe63e8dbcc0bf00c38d7dc7e0644 /yt | |
parent | 3043029fb8a5f38eb920b157cc642c1e620be73f (diff) | |
download | ydb-f4df711d7e78fb95059b084b527b9003354549e2.tar.gz |
Block status report for YT operations
commit_hash:f8e0160303c3726f15ecb5f67eeb1279fa90e69e
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yql/providers/yt/gateway/native/yql_yt_native.cpp | 45 |
1 files changed, 45 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index feeceb31e5..330cfbbbad 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -920,6 +920,8 @@ public: execCtx->SetOutput(outputOp.Cast().Output()); } + ReportBlockStatus(opBase, execCtx); + TFuture<void> future; if (auto op = opBase.Maybe<TYtSort>()) { future = DoSort(op.Cast(), execCtx); @@ -5744,6 +5746,49 @@ private: } } + static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) { + if (execCtx->Options_.PublicId().Empty()) { + return; + } + + auto opPublicId = *execCtx->Options_.PublicId(); + + TOperationProgress::EOpBlockStatus status; + if (auto map = op.Maybe<TYtMap>()) { + status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref()); + } else if (auto map = op.Maybe<TYtReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + } else if (auto map = op.Maybe<TYtMapReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) { + status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref())); + } + } else if (auto fill = op.Maybe<TYtFill>()) { + status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref()); + } else if (op.Maybe<TYtSort>()) { + return; + } else if (op.Maybe<TYtCopy>()) { + return; + } else if (op.Maybe<TYtMerge>()) { + return; + } else if (op.Maybe<TYtTouch>()) { + return; + } else if (op.Maybe<TYtDropTable>()) { + return; + } else if (op.Maybe<TYtStatOut>()) { + return; + } else if (op.Maybe<TYtDqProcessWrite>()) { + return; + } else { + YQL_ENSURE(false, "unknown operation: " << op.Ref().Content()); + } + + YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId; + auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress); + p.BlockStatus = status; + execCtx->Session_->ProgressWriter_(p); + } + private: const TYtNativeServices Services_; const TConfigClusters::TPtr Clusters_; |