diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-13 17:59:31 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2025-03-13 18:14:05 +0300 |
commit | f4df711d7e78fb95059b084b527b9003354549e2 (patch) | |
tree | fc82c7de5ae2fe63e8dbcc0bf00c38d7dc7e0644 | |
parent | 3043029fb8a5f38eb920b157cc642c1e620be73f (diff) | |
download | ydb-f4df711d7e78fb95059b084b527b9003354549e2.tar.gz |
Block status report for YT operations
commit_hash:f8e0160303c3726f15ecb5f67eeb1279fa90e69e
-rw-r--r-- | yql/essentials/core/yql_execution.cpp | 15 | ||||
-rw-r--r-- | yql/essentials/core/yql_execution.h | 7 | ||||
-rw-r--r-- | yql/essentials/core/yql_opt_utils.cpp | 48 | ||||
-rw-r--r-- | yql/essentials/core/yql_opt_utils.h | 3 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/native/yql_yt_native.cpp | 45 |
5 files changed, 118 insertions, 0 deletions
diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp index 5b34866487..57892229b3 100644 --- a/yql/essentials/core/yql_execution.cpp +++ b/yql/essentials/core/yql_execution.cpp @@ -961,3 +961,18 @@ void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperat return; } } + +template<> +void Out<NYql::TOperationProgress::EOpBlockStatus>(class IOutputStream &o, NYql::TOperationProgress::EOpBlockStatus x) { +#define YQL_OPERATION_BLOCK_STATUS_MAP_TO_STRING_IMPL(name, ...) \ + case NYql::TOperationProgress::EOpBlockStatus::name: \ + o << #name; \ + return; + + switch (x) { + YQL_OPERATION_BLOCK_STATUS_MAP(YQL_OPERATION_BLOCK_STATUS_MAP_TO_STRING_IMPL) + default: + o << static_cast<int>(x); + return; + } +} diff --git a/yql/essentials/core/yql_execution.h b/yql/essentials/core/yql_execution.h index e1714d5eda..cf1476402d 100644 --- a/yql/essentials/core/yql_execution.h +++ b/yql/essentials/core/yql_execution.h @@ -77,6 +77,13 @@ namespace NYql { , Stage(stage, TInstant::Now()) { } + + static EOpBlockStatus CombineBlockStatuses(EOpBlockStatus lhs, EOpBlockStatus rhs) { + if (lhs == rhs) { + return lhs; + } + return EOpBlockStatus::Partial; + } }; struct TOperationStatistics { diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp index 73f0bf064c..9a51b06801 100644 --- a/yql/essentials/core/yql_opt_utils.cpp +++ b/yql/essentials/core/yql_opt_utils.cpp @@ -2423,4 +2423,52 @@ TExprNode::TPtr KeepWorld(TExprNode::TPtr node, const TExprNode& src, TExprConte } } +TOperationProgress::EOpBlockStatus DetermineProgramBlockStatus(const TExprNode& root) { + auto pRoot = &root; + + // TODO: remove after block IO transition to Stream + if (pRoot->IsCallable("ToFlow")) { + pRoot = &pRoot->Head(); + } + + if (pRoot->IsCallable("WideFromBlocks")) { + // Assume Full block status even if block output is not applied + pRoot = &pRoot->Head(); + } + + auto rootType = pRoot->GetTypeAnn(); + YQL_ENSURE(rootType); + + auto status = IsWideSequenceBlockType(*rootType) ? TOperationProgress::EOpBlockStatus::Full : TOperationProgress::EOpBlockStatus::None; + bool stop = false; + VisitExpr(*pRoot, [&](const TExprNode& node) { + if (stop || node.IsLambda()) { + return false; + } + + const TTypeAnnotationNode* nodeType = node.GetTypeAnn(); + YQL_ENSURE(nodeType); + + if (nodeType->GetKind() != ETypeAnnotationKind::Stream && nodeType->GetKind() != ETypeAnnotationKind::Flow) { + return false; + } + + const bool isBlock = IsWideSequenceBlockType(*nodeType); + if (status == TOperationProgress::EOpBlockStatus::Full && !isBlock || + status == TOperationProgress::EOpBlockStatus::None && isBlock) + { + status = TOperationProgress::EOpBlockStatus::Partial; + } + + if (status == TOperationProgress::EOpBlockStatus::Partial) { + stop = true; + return false; + } + + return true; + }); + + return status; +} + } diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h index aa83773087..fd818dbd35 100644 --- a/yql/essentials/core/yql_opt_utils.h +++ b/yql/essentials/core/yql_opt_utils.h @@ -1,6 +1,7 @@ #pragma once #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/core/yql_execution.h> #include <yql/essentials/core/yql_graph_transformer.h> #include <yql/essentials/core/yql_opt_window.h> #include <yql/essentials/core/yql_type_annotation.h> @@ -192,4 +193,6 @@ bool IsOptimizerDisabled(const TTypeAnnotationContext& types) { extern const char KeepWorldOptName[]; +TOperationProgress::EOpBlockStatus DetermineProgramBlockStatus(const TExprNode& root); + } diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index feeceb31e5..330cfbbbad 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -920,6 +920,8 @@ public: execCtx->SetOutput(outputOp.Cast().Output()); } + ReportBlockStatus(opBase, execCtx); + TFuture<void> future; if (auto op = opBase.Maybe<TYtSort>()) { future = DoSort(op.Cast(), execCtx); @@ -5744,6 +5746,49 @@ private: } } + static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) { + if (execCtx->Options_.PublicId().Empty()) { + return; + } + + auto opPublicId = *execCtx->Options_.PublicId(); + + TOperationProgress::EOpBlockStatus status; + if (auto map = op.Maybe<TYtMap>()) { + status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref()); + } else if (auto map = op.Maybe<TYtReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + } else if (auto map = op.Maybe<TYtMapReduce>()) { + status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref()); + if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) { + status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref())); + } + } else if (auto fill = op.Maybe<TYtFill>()) { + status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref()); + } else if (op.Maybe<TYtSort>()) { + return; + } else if (op.Maybe<TYtCopy>()) { + return; + } else if (op.Maybe<TYtMerge>()) { + return; + } else if (op.Maybe<TYtTouch>()) { + return; + } else if (op.Maybe<TYtDropTable>()) { + return; + } else if (op.Maybe<TYtStatOut>()) { + return; + } else if (op.Maybe<TYtDqProcessWrite>()) { + return; + } else { + YQL_ENSURE(false, "unknown operation: " << op.Ref().Content()); + } + + YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId; + auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress); + p.BlockStatus = status; + execCtx->Session_->ProgressWriter_(p); + } + private: const TYtNativeServices Services_; const TConfigClusters::TPtr Clusters_; |