summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-16 18:45:28 +0300
committerhiddenpath <[email protected]>2024-12-16 19:11:27 +0300
commitdae2dbe3496d7557b1ece64d5464bd8e686995a8 (patch)
tree72899b4fb6279a620a14f71005d81afbbe93cd35 /yt/cpp/mapreduce/client/operation.cpp
parentb1cde7dcb055fb6f3367e81fd0f57bd55b8bb93c (diff)
[yt/cpp/mapreduce] YT-23616: Move Transaction and Operation methods to THttpRawClient
commit_hash:b093be44005f3d9da9779444cbbc32b93f7372ee
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp96
1 files changed, 69 insertions, 27 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index c5a9d7f5231..9f8c14647a5 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -552,17 +552,21 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
}
EOperationBriefState CheckOperation(
+ const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TOperationId& operationId)
{
- auto attributes = GetOperation(
+ auto attributes = RequestWithRetry<TOperationAttributes>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::Result)));
+ [&rawClient, &operationId] (TMutationId /*mutationId*/) {
+ return rawClient->GetOperation(
+ operationId,
+ TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
+ .Add(EOperationAttribute::State)
+ .Add(EOperationAttribute::Result)));
+ });
+
Y_ABORT_UNLESS(attributes.BriefState,
"get_operation for operation %s has not returned \"state\" field",
GetGuidAsString(operationId).data());
@@ -604,7 +608,7 @@ void WaitForOperation(
: context.Config->OperationTrackerPollPeriod;
while (true) {
- auto status = CheckOperation(clientRetryPolicy, context, operationId);
+ auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId);
if (status == EOperationBriefState::Completed) {
YT_LOG_INFO("Operation %v completed (%v)",
operationId,
@@ -2237,11 +2241,13 @@ class TOperation::TOperationImpl
{
public:
TOperationImpl(
+ IRawClientPtr rawClient,
IClientRetryPolicyPtr clientRetryPolicy,
TClientContext context,
const TMaybe<TOperationId>& operationId = {})
- : ClientRetryPolicy_(clientRetryPolicy)
+ : RawClient_(std::move(rawClient))
, Context_(std::move(context))
+ , ClientRetryPolicy_(clientRetryPolicy)
, Id_(operationId)
, PreparedPromise_(::NThreading::NewPromise<void>())
, StartedPromise_(::NThreading::NewPromise<void>())
@@ -2308,8 +2314,10 @@ private:
void ValidateOperationStarted() const;
private:
- IClientRetryPolicyPtr ClientRetryPolicy_;
+ const IRawClientPtr RawClient_;
const TClientContext Context_;
+
+ IClientRetryPolicyPtr ClientRetryPolicy_;
TMaybe<TOperationId> Id_;
TMutex Lock_;
@@ -2509,7 +2517,7 @@ void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus)
auto registry = TAbortableRegistry::Get();
registry->Add(
operationId,
- ::MakeIntrusive<TOperationAbortable>(this_->ClientRetryPolicy_, this_->Context_, operationId));
+ ::MakeIntrusive<TOperationAbortable>(this_->RawClient_, this_->ClientRetryPolicy_, operationId));
// We have to own an IntrusivePtr to registry to prevent use-after-free
auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) {
registry->Remove(operationId);
@@ -2632,7 +2640,9 @@ void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId)
StartedPromise_.SetValue();
}
-void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func)
+void TOperation::TOperationImpl::UpdateAttributesAndCall(
+ bool needJobStatistics,
+ std::function<void(const TOperationAttributes&)> func)
{
{
auto g = Guard(Lock_);
@@ -2645,15 +2655,17 @@ void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics,
}
}
- TOperationAttributes attributes = NDetail::GetOperation(
+ auto attributes = RequestWithRetry<TOperationAttributes>(
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- *Id_,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::Result)
- .Add(EOperationAttribute::Progress)
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::BriefProgress)));
+ [this] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(
+ *Id_,
+ TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
+ .Add(EOperationAttribute::Result)
+ .Add(EOperationAttribute::Progress)
+ .Add(EOperationAttribute::State)
+ .Add(EOperationAttribute::BriefProgress)));
+ });
func(attributes);
@@ -2672,37 +2684,61 @@ void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e)
void TOperation::TOperationImpl::AbortOperation()
{
ValidateOperationStarted();
- NYT::NDetail::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId& mutationId) {
+ RawClient_->AbortOperation(mutationId, *Id_);
+ });
}
void TOperation::TOperationImpl::CompleteOperation()
{
ValidateOperationStarted();
- NYT::NDetail::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this] (TMutationId& mutationId) {
+ RawClient_->CompleteOperation(mutationId, *Id_);
+ });
}
void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options)
{
ValidateOperationStarted();
- NYT::NDetail::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId& mutationId) {
+ RawClient_->SuspendOperation(mutationId, *Id_, options);
+ });
}
void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options)
{
ValidateOperationStarted();
- NYT::NDetail::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId& mutationId) {
+ RawClient_->ResumeOperation(mutationId, *Id_, options);
+ });
}
TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ return RequestWithRetry<TOperationAttributes>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetOperation(*Id_, options);
+ });
}
void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ RawClient_->UpdateOperationParameters(*Id_, options);
+ });
}
TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
@@ -2815,13 +2851,19 @@ const TClientContext& TOperation::TOperationImpl::GetContext() const
TOperation::TOperation(TClientPtr client)
: Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext()))
+ , Impl_(::MakeIntrusive<TOperationImpl>(
+ Client_->GetRawClient(),
+ Client_->GetRetryPolicy(),
+ Client_->GetContext()))
{
}
TOperation::TOperation(TOperationId id, TClientPtr client)
: Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext(), id))
+ , Impl_(::MakeIntrusive<TOperationImpl>(
+ Client_->GetRawClient(),
+ Client_->GetRetryPolicy(),
+ Client_->GetContext(), id))
{
}