aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-03-13 17:59:31 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-03-13 18:14:05 +0300
commitf4df711d7e78fb95059b084b527b9003354549e2 (patch)
treefc82c7de5ae2fe63e8dbcc0bf00c38d7dc7e0644 /yt
parent3043029fb8a5f38eb920b157cc642c1e620be73f (diff)
downloadydb-f4df711d7e78fb95059b084b527b9003354549e2.tar.gz
Block status report for YT operations
commit_hash:f8e0160303c3726f15ecb5f67eeb1279fa90e69e
Diffstat (limited to 'yt')
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp45
1 files changed, 45 insertions, 0 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index feeceb31e5..330cfbbbad 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -920,6 +920,8 @@ public:
execCtx->SetOutput(outputOp.Cast().Output());
}
+ ReportBlockStatus(opBase, execCtx);
+
TFuture<void> future;
if (auto op = opBase.Maybe<TYtSort>()) {
future = DoSort(op.Cast(), execCtx);
@@ -5744,6 +5746,49 @@ private:
}
}
+ static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) {
+ if (execCtx->Options_.PublicId().Empty()) {
+ return;
+ }
+
+ auto opPublicId = *execCtx->Options_.PublicId();
+
+ TOperationProgress::EOpBlockStatus status;
+ if (auto map = op.Maybe<TYtMap>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref());
+ } else if (auto map = op.Maybe<TYtReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ } else if (auto map = op.Maybe<TYtMapReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) {
+ status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref()));
+ }
+ } else if (auto fill = op.Maybe<TYtFill>()) {
+ status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref());
+ } else if (op.Maybe<TYtSort>()) {
+ return;
+ } else if (op.Maybe<TYtCopy>()) {
+ return;
+ } else if (op.Maybe<TYtMerge>()) {
+ return;
+ } else if (op.Maybe<TYtTouch>()) {
+ return;
+ } else if (op.Maybe<TYtDropTable>()) {
+ return;
+ } else if (op.Maybe<TYtStatOut>()) {
+ return;
+ } else if (op.Maybe<TYtDqProcessWrite>()) {
+ return;
+ } else {
+ YQL_ENSURE(false, "unknown operation: " << op.Ref().Content());
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId;
+ auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress);
+ p.BlockStatus = status;
+ execCtx->Session_->ProgressWriter_(p);
+ }
+
private:
const TYtNativeServices Services_;
const TConfigClusters::TPtr Clusters_;