diff options
| author | hiddenpath <[email protected]> | 2024-12-16 18:45:28 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-16 19:11:27 +0300 |
| commit | dae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch) | |
| tree | 72899b4fb6279a620a14f71005d81afbbe93cd35 /yt/cpp/mapreduce/client/operation.cpp | |
| parent | b1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff) | |
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 96 |
1 files changed, 69 insertions, 27 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index c5a9d7f5231..9f8c14647a5 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -552,17 +552,21 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( } EOperationBriefState CheckOperation( + const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, const TClientContext& context, const TOperationId& operationId) { - auto attributes = GetOperation( + auto attributes = RequestWithRetry<TOperationAttributes>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() - .Add(EOperationAttribute::State) - .Add(EOperationAttribute::Result))); + [&rawClient, &operationId] (TMutationId /*mutationId*/) { + return rawClient->GetOperation( + operationId, + TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() + .Add(EOperationAttribute::State) + .Add(EOperationAttribute::Result))); + }); + Y_ABORT_UNLESS(attributes.BriefState, "get_operation for operation %s has not returned \"state\" field", GetGuidAsString(operationId).data()); @@ -604,7 +608,7 @@ void WaitForOperation( : context.Config->OperationTrackerPollPeriod; while (true) { - auto status = CheckOperation(clientRetryPolicy, context, operationId); + auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId); if (status == EOperationBriefState::Completed) { YT_LOG_INFO("Operation %v completed (%v)", operationId, @@ -2237,11 +2241,13 @@ class TOperation::TOperationImpl { public: TOperationImpl( + IRawClientPtr rawClient, IClientRetryPolicyPtr clientRetryPolicy, TClientContext context, const TMaybe<TOperationId>& operationId = {}) - : ClientRetryPolicy_(clientRetryPolicy) + : RawClient_(std::move(rawClient)) , Context_(std::move(context)) + , ClientRetryPolicy_(clientRetryPolicy) , Id_(operationId) , PreparedPromise_(::NThreading::NewPromise<void>()) , StartedPromise_(::NThreading::NewPromise<void>()) @@ -2308,8 +2314,10 @@ private: void ValidateOperationStarted() const; private: - IClientRetryPolicyPtr ClientRetryPolicy_; + const IRawClientPtr RawClient_; const TClientContext Context_; + + IClientRetryPolicyPtr ClientRetryPolicy_; TMaybe<TOperationId> Id_; TMutex Lock_; @@ -2509,7 +2517,7 @@ void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus) auto registry = TAbortableRegistry::Get(); registry->Add( operationId, - ::MakeIntrusive<TOperationAbortable>(this_->ClientRetryPolicy_, this_->Context_, operationId)); + ::MakeIntrusive<TOperationAbortable>(this_->RawClient_, this_->ClientRetryPolicy_, operationId)); // We have to own an IntrusivePtr to registry to prevent use-after-free auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) { registry->Remove(operationId); @@ -2632,7 +2640,9 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId) StartedPromise_.SetValue(); } -void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func) +void TOperation::TOperationImpl::UpdateAttributesAndCall( + bool needJobStatistics, + std::function<void(const TOperationAttributes&)> func) { { auto g = Guard(Lock_); @@ -2645,15 +2655,17 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, } } - TOperationAttributes attributes = NDetail::GetOperation( + auto attributes = RequestWithRetry<TOperationAttributes>( ClientRetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - *Id_, - TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() - .Add(EOperationAttribute::Result) - .Add(EOperationAttribute::Progress) - .Add(EOperationAttribute::State) - .Add(EOperationAttribute::BriefProgress))); + [this] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation( + *Id_, + TGetOperationOptions().AttributeFilter(TOperationAttributeFilter() + .Add(EOperationAttribute::Result) + .Add(EOperationAttribute::Progress) + .Add(EOperationAttribute::State) + .Add(EOperationAttribute::BriefProgress))); + }); func(attributes); @@ -2672,37 +2684,61 @@ void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e) void TOperation::TOperationImpl::AbortOperation() { ValidateOperationStarted(); - NYT::NDetail::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId& mutationId) { + RawClient_->AbortOperation(mutationId, *Id_); + }); } void TOperation::TOperationImpl::CompleteOperation() { ValidateOperationStarted(); - NYT::NDetail::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this] (TMutationId& mutationId) { + RawClient_->CompleteOperation(mutationId, *Id_); + }); } void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options) { ValidateOperationStarted(); - NYT::NDetail::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId& mutationId) { + RawClient_->SuspendOperation(mutationId, *Id_, options); + }); } void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options) { ValidateOperationStarted(); - NYT::NDetail::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId& mutationId) { + RawClient_->ResumeOperation(mutationId, *Id_, options); + }); } TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + return RequestWithRetry<TOperationAttributes>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetOperation(*Id_, options); + }); } void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + RawClient_->UpdateOperationParameters(*Id_, options); + }); } TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options) @@ -2815,13 +2851,19 @@ const TClientContext& TOperation::TOperationImpl::GetContext() const TOperation::TOperation(TClientPtr client) : Client_(std::move(client)) - , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext())) + , Impl_(::MakeIntrusive<TOperationImpl>( + Client_->GetRawClient(), + Client_->GetRetryPolicy(), + Client_->GetContext())) { } TOperation::TOperation(TOperationId id, TClientPtr client) : Client_(std::move(client)) - , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext(), id)) + , Impl_(::MakeIntrusive<TOperationImpl>( + Client_->GetRawClient(), + Client_->GetRetryPolicy(), + Client_->GetContext(), id)) { } |
