diff options
| author | nadya73 <[email protected]> | 2025-07-30 14:15:23 +0300 |
|---|---|---|
| committer | nadya73 <[email protected]> | 2025-07-30 14:54:41 +0300 |
| commit | 571053e719291f94f9d2da514abfded5de7c276b (patch) | |
| tree | 0f9bf19a4f264b50c39e5ee64a3e54a87dbfad0c /yt/cpp/mapreduce/client/operation.cpp | |
| parent | 7f083965e3b8a40b42c3d0e3832b60210c56ee6e (diff) | |
[yt/cpp/mapreduce] Add GetAlerts in IOperation
commit_hash:79c12c795b4cd3461e20543dafe5c16f47fa7b4b
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index a885657fb57..97346c21067 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -2309,6 +2309,8 @@ public: TMaybe<TYtError> GetError(); TJobStatistics GetJobStatistics(); TMaybe<TOperationBriefProgress> GetBriefProgress(); + TMaybe<THashMap<TString, TYtError>> GetAlerts(); + void AbortOperation(); void CompleteOperation(); void SuspendOperation(const TSuspendOperationOptions& options); @@ -2328,7 +2330,13 @@ public: private: void OnStarted(const TOperationId& operationId); - void UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func); + struct TUpdateAttributesOptions + { + bool NeedJobStatistics = false; + bool NeedAlerts = false; + }; + + void UpdateAttributesAndCall(const TUpdateAttributesOptions& options, std::function<void(const TOperationAttributes&)> func); void SyncFinishOperationImpl(const TOperationAttributes&); static void* SyncFinishOperationProc(void* ); @@ -2494,7 +2502,7 @@ TString TOperation::TOperationImpl::GetStatus() } } TMaybe<TString> state; - UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) { + UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) { state = attributes.State; }); @@ -2556,7 +2564,7 @@ EOperationBriefState TOperation::TOperationImpl::GetBriefState() { ValidateOperationStarted(); EOperationBriefState result = EOperationBriefState::InProgress; - UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) { + UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) { Y_ABORT_UNLESS(attributes.BriefState, "get_operation for operation %s has not returned \"state\" field", GetGuidAsString(*Id_).data()); @@ -2569,7 +2577,7 @@ TMaybe<TYtError> TOperation::TOperationImpl::GetError() { ValidateOperationStarted(); TMaybe<TYtError> result; - UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) { + UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) { Y_ABORT_UNLESS(attributes.Result); result = attributes.Result->Error; }); @@ -2580,7 +2588,7 @@ TJobStatistics TOperation::TOperationImpl::GetJobStatistics() { ValidateOperationStarted(); TJobStatistics result; - UpdateAttributesAndCall(true, [&] (const TOperationAttributes& attributes) { + UpdateAttributesAndCall(/*options*/ {.NeedJobStatistics = true}, [&] (const TOperationAttributes& attributes) { if (attributes.Progress) { result = attributes.Progress->JobStatistics; } @@ -2594,17 +2602,27 @@ TMaybe<TOperationBriefProgress> TOperation::TOperationImpl::GetBriefProgress() { auto g = Guard(Lock_); if (CompletePromise_.Defined()) { - // Poller do this job for us + // Poller do this job for us. return Attributes_.BriefProgress; } } TMaybe<TOperationBriefProgress> result; - UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) { + UpdateAttributesAndCall(/*options*/ {}, [&] (const TOperationAttributes& attributes) { result = attributes.BriefProgress; }); return result; } +TMaybe<THashMap<TString, TYtError>> TOperation::TOperationImpl::GetAlerts() +{ + ValidateOperationStarted(); + TMaybe<THashMap<TString, TYtError>> result; + UpdateAttributesAndCall(/*options*/ {.NeedAlerts = true}, [&] (const TOperationAttributes& attributes) { + result = attributes.Alerts; + }); + return result; +} + void TOperation::TOperationImpl::UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress) { auto g = Guard(Lock_); @@ -2665,14 +2683,14 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId) } void TOperation::TOperationImpl::UpdateAttributesAndCall( - bool needJobStatistics, + const TUpdateAttributesOptions& options, std::function<void(const TOperationAttributes&)> func) { { auto g = Guard(Lock_); if (Attributes_.BriefState && *Attributes_.BriefState != EOperationBriefState::InProgress - && (!needJobStatistics || Attributes_.Progress)) + && (!options.NeedJobStatistics || Attributes_.Progress)) { func(Attributes_); return; @@ -2685,9 +2703,12 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall( .Add(EOperationAttribute::BriefProgress); // To avoid overloading Cypress, we only request the progress attribute as needed, // typically when the user asks for job statistics. - if (needJobStatistics) { + if (options.NeedJobStatistics) { filter.Add(EOperationAttribute::Progress); } + if (options.NeedAlerts) { + filter.Add(EOperationAttribute::Alerts); + } auto attributes = RequestWithRetry<TOperationAttributes>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), @@ -2991,6 +3012,11 @@ TMaybe<TOperationBriefProgress> TOperation::GetBriefProgress() return Impl_->GetBriefProgress(); } +TMaybe<THashMap<TString, TYtError>> TOperation::GetAlerts() +{ + return Impl_->GetAlerts(); +} + void TOperation::AbortOperation() { Impl_->AbortOperation(); |
