diff options
| author | hiddenpath <[email protected]> | 2024-12-17 01:13:52 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-17 02:03:47 +0300 |
| commit | 5935906b0bfd05ea9cf84fc03e1b7d8befd2ff11 (patch) | |
| tree | 9f72ba6c791e8374a21699eb820767c9fbaa16a8 /yt/cpp/mapreduce/client/operation.cpp | |
| parent | b570317a503ddb08ae344d96997c4ebb45002b8d (diff) | |
[yt/cpp/mapreduce] YT-23616: Move Job methods to THttpRawClient
commit_hash:bd11304f4147ff314372d4ab6049478143f60fd5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 106 |
1 files changed, 59 insertions, 47 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 9f8c14647a5..98ed246e0e3 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -54,8 +54,6 @@ namespace NYT { namespace NDetail { -using namespace NRawClient; - using ::ToString; //////////////////////////////////////////////////////////////////////////////// @@ -226,11 +224,11 @@ TStructuredJobTableList ApplyProtobufColumnFilters( return tableList; } - auto isDynamic = BatchTransform( + auto isDynamic = NRawClient::BatchTransform( CreateDefaultRequestRetryPolicy(preparer.GetContext().Config), preparer.GetContext(), tableList, - [&] (TRawBatchRequest& batch, const auto& table) { + [&] (NRawClient::TRawBatchRequest& batch, const auto& table) { return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); }); @@ -306,8 +304,8 @@ TSimpleOperationIo CreateSimpleOperationIo( } }; - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs()); - auto outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs()); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs()); + auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs()); VerifyHasElements(inputs, "input"); VerifyHasElements(outputs, "output"); @@ -349,20 +347,19 @@ TSimpleOperationIo CreateSimpleOperationIo( TString GetJobStderrWithRetriesAndIgnoreErrors( const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, const TJobId& jobId, const size_t stderrTailSize, - const TGetJobStderrOptions& options = TGetJobStderrOptions()) + const TGetJobStderrOptions& options = {}) { TString jobStderr; try { - jobStderr = GetJobStderrWithRetries( + jobStderr = RequestWithRetry<TString>( retryPolicy, - context, - operationId, - jobId, - options); + [&rawClient, &operationId, &jobId, &options] (TMutationId /*mutationId*/) { + return rawClient->GetJobStderrWithRetries(operationId, jobId, options); + }); } catch (const TErrorResponse& e) { YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)", operationId, @@ -377,17 +374,19 @@ TString GetJobStderrWithRetriesAndIgnoreErrors( TVector<TFailedJobInfo> GetFailedJobInfo( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, const TGetFailedJobInfoOptions& options) { - const auto listJobsResult = ListJobs( + const auto listJobsResult = RequestWithRetry<TListJobsResult>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TListJobsOptions() - .State(EJobState::Failed) - .Limit(options.MaxJobCount_)); + [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) { + return rawClient->ListJobs( + operationId, + TListJobsOptions() + .State(EJobState::Failed) + .Limit(options.MaxJobCount_)); + }); const auto stderrTailSize = options.StderrTailSize_; @@ -405,7 +404,7 @@ TVector<TFailedJobInfo> GetFailedJobInfo( // so we ignore all errors and try our luck on other jobs. info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, + rawClient, operationId, *job.Id, stderrTailSize); @@ -427,15 +426,20 @@ struct TGetJobsStderrOptions static TVector<TString> GetJobsStderr( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, - const TGetJobsStderrOptions& options = TGetJobsStderrOptions()) + const TGetJobsStderrOptions& options = {}) { - const auto listJobsResult = ListJobs( + const auto listJobsResult = RequestWithRetry<TListJobsResult>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TListJobsOptions().Limit(options.MaxJobCount_).WithStderr(true)); + [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) { + return rawClient->ListJobs( + operationId, + TListJobsOptions() + .Limit(options.MaxJobCount_) + .WithStderr(true)); + }); + const auto stderrTailSize = options.StderrTailSize_; TVector<TString> result; for (const auto& job : listJobsResult.Jobs) { @@ -447,7 +451,7 @@ static TVector<TString> GetJobsStderr( // so we ignore all errors and try our luck on other jobs. GetJobStderrWithRetriesAndIgnoreErrors( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, + rawClient, operationId, *job.Id, stderrTailSize) @@ -554,7 +558,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( EOperationBriefState CheckOperation( const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, const TOperationId& operationId) { auto attributes = RequestWithRetry<TOperationAttributes>( @@ -580,7 +583,7 @@ EOperationBriefState CheckOperation( auto failedJobInfoList = GetFailedJobInfo( clientRetryPolicy, - context, + rawClient, operationId, TGetFailedJobInfoOptions()); @@ -608,7 +611,7 @@ void WaitForOperation( : context.Config->OperationTrackerPollPeriod; while (true) { - auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId); + auto status = CheckOperation(rawClient, clientRetryPolicy, operationId); if (status == EOperationBriefState::Completed) { YT_LOG_INFO("Operation %v completed (%v)", operationId, @@ -1901,9 +1904,9 @@ void ExecuteRawMapReduce( YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)", preparer->GetPreparationId()); TMapReduceOperationIo operationIo; - operationIo.Inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs()); - operationIo.MapOutputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs()); - operationIo.Outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs()); + operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs()); + operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs()); + operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs()); VerifyHasElements(operationIo.Inputs, "inputs"); VerifyHasElements(operationIo.Outputs, "outputs"); @@ -1950,8 +1953,8 @@ void ExecuteSort( { YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -1999,8 +2002,8 @@ void ExecuteMerge( { YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -2049,7 +2052,7 @@ void ExecuteErase( { YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)", preparer->GetPreparationId()); - auto tablePath = CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); + auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() .BeginMap().Item("spec").BeginMap() @@ -2085,8 +2088,8 @@ void ExecuteRemoteCopy( { YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CreateOutputTable(*preparer, output); @@ -2340,7 +2343,7 @@ public: : OperationImpl_(std::move(operationImpl)) { } - void PrepareRequest(TRawBatchRequest* batchRequest) override + void PrepareRequest(NRawClient::TRawBatchRequest* batchRequest) override { auto filter = TOperationAttributeFilter() .Add(EOperationAttribute::State) @@ -2744,13 +2747,22 @@ void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParamete TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, jobId, options); + auto result = RequestWithRetry<NYson::TYsonString>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &jobId, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetJob(*Id_, jobId, options); + }); + return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf())); } TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + return RequestWithRetry<TListJobsResult>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + return RawClient_->ListJobs(*Id_, options); + }); } struct TAsyncFinishOperationsArgs @@ -2816,7 +2828,7 @@ void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttribu TVector<TFailedJobInfo> failedJobStderrInfo; if (*attributes.BriefState == EOperationBriefState::Failed) { try { - failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, Context_, *Id_, TGetFailedJobInfoOptions()); + failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, RawClient_, *Id_, TGetFailedJobInfoOptions()); } catch (const std::exception& e) { additionalExceptionText = "Cannot get job stderrs: "; additionalExceptionText += e.what(); @@ -2929,7 +2941,7 @@ void TOperation::OnStatusUpdated(const TString& newStatus) TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options) { - return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetContext(), GetId(), options); + return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetRawClient(), GetId(), options); } EOperationBriefState TOperation::GetBriefState() @@ -3051,7 +3063,7 @@ void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, co TWaitProxy::Get()->WaitFuture(finishedFuture); finishedFuture.GetValue(); if (context.Config->WriteStderrSuccessfulJobs) { - auto stderrs = GetJobsStderr(retryPolicy, context, operation->GetId()); + auto stderrs = GetJobsStderr(retryPolicy, client->GetRawClient(), operation->GetId()); for (const auto& jobStderr : stderrs) { if (!jobStderr.empty()) { Cerr << jobStderr << '\n'; |
