aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-03-13 17:59:31 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-03-13 18:14:05 +0300
commitf4df711d7e78fb95059b084b527b9003354549e2 (patch)
treefc82c7de5ae2fe63e8dbcc0bf00c38d7dc7e0644
parent3043029fb8a5f38eb920b157cc642c1e620be73f (diff)
downloadydb-f4df711d7e78fb95059b084b527b9003354549e2.tar.gz
Block status report for YT operations
commit_hash:f8e0160303c3726f15ecb5f67eeb1279fa90e69e
-rw-r--r--yql/essentials/core/yql_execution.cpp15
-rw-r--r--yql/essentials/core/yql_execution.h7
-rw-r--r--yql/essentials/core/yql_opt_utils.cpp48
-rw-r--r--yql/essentials/core/yql_opt_utils.h3
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp45
5 files changed, 118 insertions, 0 deletions
diff --git a/yql/essentials/core/yql_execution.cpp b/yql/essentials/core/yql_execution.cpp
index 5b34866487..57892229b3 100644
--- a/yql/essentials/core/yql_execution.cpp
+++ b/yql/essentials/core/yql_execution.cpp
@@ -961,3 +961,18 @@ void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperat
return;
}
}
+
+template<>
+void Out<NYql::TOperationProgress::EOpBlockStatus>(class IOutputStream &o, NYql::TOperationProgress::EOpBlockStatus x) {
+#define YQL_OPERATION_BLOCK_STATUS_MAP_TO_STRING_IMPL(name, ...) \
+ case NYql::TOperationProgress::EOpBlockStatus::name: \
+ o << #name; \
+ return;
+
+ switch (x) {
+ YQL_OPERATION_BLOCK_STATUS_MAP(YQL_OPERATION_BLOCK_STATUS_MAP_TO_STRING_IMPL)
+ default:
+ o << static_cast<int>(x);
+ return;
+ }
+}
diff --git a/yql/essentials/core/yql_execution.h b/yql/essentials/core/yql_execution.h
index e1714d5eda..cf1476402d 100644
--- a/yql/essentials/core/yql_execution.h
+++ b/yql/essentials/core/yql_execution.h
@@ -77,6 +77,13 @@ namespace NYql {
, Stage(stage, TInstant::Now())
{
}
+
+ static EOpBlockStatus CombineBlockStatuses(EOpBlockStatus lhs, EOpBlockStatus rhs) {
+ if (lhs == rhs) {
+ return lhs;
+ }
+ return EOpBlockStatus::Partial;
+ }
};
struct TOperationStatistics {
diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp
index 73f0bf064c..9a51b06801 100644
--- a/yql/essentials/core/yql_opt_utils.cpp
+++ b/yql/essentials/core/yql_opt_utils.cpp
@@ -2423,4 +2423,52 @@ TExprNode::TPtr KeepWorld(TExprNode::TPtr node, const TExprNode& src, TExprConte
}
}
+TOperationProgress::EOpBlockStatus DetermineProgramBlockStatus(const TExprNode& root) {
+ auto pRoot = &root;
+
+ // TODO: remove after block IO transition to Stream
+ if (pRoot->IsCallable("ToFlow")) {
+ pRoot = &pRoot->Head();
+ }
+
+ if (pRoot->IsCallable("WideFromBlocks")) {
+ // Assume Full block status even if block output is not applied
+ pRoot = &pRoot->Head();
+ }
+
+ auto rootType = pRoot->GetTypeAnn();
+ YQL_ENSURE(rootType);
+
+ auto status = IsWideSequenceBlockType(*rootType) ? TOperationProgress::EOpBlockStatus::Full : TOperationProgress::EOpBlockStatus::None;
+ bool stop = false;
+ VisitExpr(*pRoot, [&](const TExprNode& node) {
+ if (stop || node.IsLambda()) {
+ return false;
+ }
+
+ const TTypeAnnotationNode* nodeType = node.GetTypeAnn();
+ YQL_ENSURE(nodeType);
+
+ if (nodeType->GetKind() != ETypeAnnotationKind::Stream && nodeType->GetKind() != ETypeAnnotationKind::Flow) {
+ return false;
+ }
+
+ const bool isBlock = IsWideSequenceBlockType(*nodeType);
+ if (status == TOperationProgress::EOpBlockStatus::Full && !isBlock ||
+ status == TOperationProgress::EOpBlockStatus::None && isBlock)
+ {
+ status = TOperationProgress::EOpBlockStatus::Partial;
+ }
+
+ if (status == TOperationProgress::EOpBlockStatus::Partial) {
+ stop = true;
+ return false;
+ }
+
+ return true;
+ });
+
+ return status;
+}
+
}
diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h
index aa83773087..fd818dbd35 100644
--- a/yql/essentials/core/yql_opt_utils.h
+++ b/yql/essentials/core/yql_opt_utils.h
@@ -1,6 +1,7 @@
#pragma once
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/core/yql_execution.h>
#include <yql/essentials/core/yql_graph_transformer.h>
#include <yql/essentials/core/yql_opt_window.h>
#include <yql/essentials/core/yql_type_annotation.h>
@@ -192,4 +193,6 @@ bool IsOptimizerDisabled(const TTypeAnnotationContext& types) {
extern const char KeepWorldOptName[];
+TOperationProgress::EOpBlockStatus DetermineProgramBlockStatus(const TExprNode& root);
+
}
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index feeceb31e5..330cfbbbad 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -920,6 +920,8 @@ public:
execCtx->SetOutput(outputOp.Cast().Output());
}
+ ReportBlockStatus(opBase, execCtx);
+
TFuture<void> future;
if (auto op = opBase.Maybe<TYtSort>()) {
future = DoSort(op.Cast(), execCtx);
@@ -5744,6 +5746,49 @@ private:
}
}
+ static void ReportBlockStatus(const TYtOpBase& op, const TExecContext<TRunOptions>::TPtr& execCtx) {
+ if (execCtx->Options_.PublicId().Empty()) {
+ return;
+ }
+
+ auto opPublicId = *execCtx->Options_.PublicId();
+
+ TOperationProgress::EOpBlockStatus status;
+ if (auto map = op.Maybe<TYtMap>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Mapper().Body().Ref());
+ } else if (auto map = op.Maybe<TYtReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ } else if (auto map = op.Maybe<TYtMapReduce>()) {
+ status = DetermineProgramBlockStatus(map.Cast().Reducer().Body().Ref());
+ if (auto mapLambda = map.Cast().Mapper().Maybe<TCoLambda>()) {
+ status = TOperationProgress::CombineBlockStatuses(status, DetermineProgramBlockStatus(mapLambda.Cast().Body().Ref()));
+ }
+ } else if (auto fill = op.Maybe<TYtFill>()) {
+ status = DetermineProgramBlockStatus(fill.Cast().Content().Body().Ref());
+ } else if (op.Maybe<TYtSort>()) {
+ return;
+ } else if (op.Maybe<TYtCopy>()) {
+ return;
+ } else if (op.Maybe<TYtMerge>()) {
+ return;
+ } else if (op.Maybe<TYtTouch>()) {
+ return;
+ } else if (op.Maybe<TYtDropTable>()) {
+ return;
+ } else if (op.Maybe<TYtStatOut>()) {
+ return;
+ } else if (op.Maybe<TYtDqProcessWrite>()) {
+ return;
+ } else {
+ YQL_ENSURE(false, "unknown operation: " << op.Ref().Content());
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "Reporting " << status << " block status for operation " << op.Ref().Content() << " with public id #" << opPublicId;
+ auto p = TOperationProgress(TString(YtProviderName), opPublicId, TOperationProgress::EState::InProgress);
+ p.BlockStatus = status;
+ execCtx->Session_->ProgressWriter_(p);
+ }
+
private:
const TYtNativeServices Services_;
const TConfigClusters::TPtr Clusters_;