summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authornadya73 <[email protected]>2025-07-30 14:15:23 +0300
committernadya73 <[email protected]>2025-07-30 14:54:41 +0300
commit571053e719291f94f9d2da514abfded5de7c276b (patch)
tree0f9bf19a4f264b50c39e5ee64a3e54a87dbfad0c /yt/cpp/mapreduce/client/operation.cpp
parent7f083965e3b8a40b42c3d0e3832b60210c56ee6e (diff)
[yt/cpp/mapreduce] Add GetAlerts in IOperation
commit_hash:79c12c795b4cd3461e20543dafe5c16f47fa7b4b
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp46
1 files changed, 36 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index a885657fb57..97346c21067 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -2309,6 +2309,8 @@ public:
TMaybe<TYtError> GetError();
TJobStatistics GetJobStatistics();
TMaybe<TOperationBriefProgress> GetBriefProgress();
+ TMaybe<THashMap<TString, TYtError>> GetAlerts();
+
void AbortOperation();
void CompleteOperation();
void SuspendOperation(const TSuspendOperationOptions& options);
@@ -2328,7 +2330,13 @@ public:
private:
void OnStarted(const TOperationId& operationId);
- void UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func);
+ struct TUpdateAttributesOptions
+ {
+ bool NeedJobStatistics = false;
+ bool NeedAlerts = false;
+ };
+
+ void UpdateAttributesAndCall(const TUpdateAttributesOptions& options, std::function<void(const TOperationAttributes&)> func);
void SyncFinishOperationImpl(const TOperationAttributes&);
static void* SyncFinishOperationProc(void* );
@@ -2494,7 +2502,7 @@ TString TOperation::TOperationImpl::GetStatus()
}
}
TMaybe<TString> state;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
+ UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) {
state = attributes.State;
});
@@ -2556,7 +2564,7 @@ EOperationBriefState TOperation::TOperationImpl::GetBriefState()
{
ValidateOperationStarted();
EOperationBriefState result = EOperationBriefState::InProgress;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
+ UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) {
Y_ABORT_UNLESS(attributes.BriefState,
"get_operation for operation %s has not returned \"state\" field",
GetGuidAsString(*Id_).data());
@@ -2569,7 +2577,7 @@ TMaybe<TYtError> TOperation::TOperationImpl::GetError()
{
ValidateOperationStarted();
TMaybe<TYtError> result;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
+ UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) {
Y_ABORT_UNLESS(attributes.Result);
result = attributes.Result->Error;
});
@@ -2580,7 +2588,7 @@ TJobStatistics TOperation::TOperationImpl::GetJobStatistics()
{
ValidateOperationStarted();
TJobStatistics result;
- UpdateAttributesAndCall(true, [&] (const TOperationAttributes& attributes) {
+ UpdateAttributesAndCall(/*options*/ {.NeedJobStatistics = true}, [&] (const TOperationAttributes& attributes) {
if (attributes.Progress) {
result = attributes.Progress->JobStatistics;
}
@@ -2594,17 +2602,27 @@ TMaybe<TOperationBriefProgress> TOperation::TOperationImpl::GetBriefProgress()
{
auto g = Guard(Lock_);
if (CompletePromise_.Defined()) {
- // Poller do this job for us
+ // Poller do this job for us.
return Attributes_.BriefProgress;
}
}
TMaybe<TOperationBriefProgress> result;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
+ UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) {
result = attributes.BriefProgress;
});
return result;
}
+TMaybe<THashMap<TString, TYtError>> TOperation::TOperationImpl::GetAlerts()
+{
+ ValidateOperationStarted();
+ TMaybe<THashMap<TString, TYtError>> result;
+ UpdateAttributesAndCall(/*options*/ {.NeedAlerts = true}, [&] (const TOperationAttributes& attributes) {
+ result = attributes.Alerts;
+ });
+ return result;
+}
+
void TOperation::TOperationImpl::UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress)
{
auto g = Guard(Lock_);
@@ -2665,14 +2683,14 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId)
}
void TOperation::TOperationImpl::UpdateAttributesAndCall(
- bool needJobStatistics,
+ const TUpdateAttributesOptions& options,
std::function<void(const TOperationAttributes&)> func)
{
{
auto g = Guard(Lock_);
if (Attributes_.BriefState
&& *Attributes_.BriefState != EOperationBriefState::InProgress
- && (!needJobStatistics || Attributes_.Progress))
+ && (!options.NeedJobStatistics || Attributes_.Progress))
{
func(Attributes_);
return;
@@ -2685,9 +2703,12 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall(
.Add(EOperationAttribute::BriefProgress);
// To avoid overloading Cypress, we only request the progress attribute as needed,
// typically when the user asks for job statistics.
- if (needJobStatistics) {
+ if (options.NeedJobStatistics) {
filter.Add(EOperationAttribute::Progress);
}
+ if (options.NeedAlerts) {
+ filter.Add(EOperationAttribute::Alerts);
+ }
auto attributes = RequestWithRetry<TOperationAttributes>(
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
@@ -2991,6 +3012,11 @@ TMaybe<TOperationBriefProgress> TOperation::GetBriefProgress()
return Impl_->GetBriefProgress();
}
+TMaybe<THashMap<TString, TYtError>> TOperation::GetAlerts()
+{
+ return Impl_->GetAlerts();
+}
+
void TOperation::AbortOperation()
{
Impl_->AbortOperation();