summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-17 01:13:52 +0300
committerhiddenpath <[email protected]>2024-12-17 02:03:47 +0300
commit5935906b0bfd05ea9cf84fc03e1b7d8befd2ff11 (patch)
tree9f72ba6c791e8374a21699eb820767c9fbaa16a8 /yt/cpp/mapreduce/client/operation.cpp
parentb570317a503ddb08ae344d96997c4ebb45002b8d (diff)
[yt/cpp/mapreduce] YT-23616: Move Job methods to THttpRawClient
commit_hash:bd11304f4147ff314372d4ab6049478143f60fd5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp106
1 files changed, 59 insertions, 47 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 9f8c14647a5..98ed246e0e3 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -54,8 +54,6 @@
namespace NYT {
namespace NDetail {
-using namespace NRawClient;
-
using ::ToString;
////////////////////////////////////////////////////////////////////////////////
@@ -226,11 +224,11 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
return tableList;
}
- auto isDynamic = BatchTransform(
+ auto isDynamic = NRawClient::BatchTransform(
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
preparer.GetContext(),
tableList,
- [&] (TRawBatchRequest& batch, const auto& table) {
+ [&] (NRawClient::TRawBatchRequest& batch, const auto& table) {
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
});
@@ -306,8 +304,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
}
};
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
- auto outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
+ auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
VerifyHasElements(inputs, "input");
VerifyHasElements(outputs, "output");
@@ -349,20 +347,19 @@ TSimpleOperationIo CreateSimpleOperationIo(
TString GetJobStderrWithRetriesAndIgnoreErrors(
const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
const TJobId& jobId,
const size_t stderrTailSize,
- const TGetJobStderrOptions& options = TGetJobStderrOptions())
+ const TGetJobStderrOptions& options = {})
{
TString jobStderr;
try {
- jobStderr = GetJobStderrWithRetries(
+ jobStderr = RequestWithRetry<TString>(
retryPolicy,
- context,
- operationId,
- jobId,
- options);
+ [&rawClient, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->GetJobStderrWithRetries(operationId, jobId, options);
+ });
} catch (const TErrorResponse& e) {
YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)",
operationId,
@@ -377,17 +374,19 @@ TString GetJobStderrWithRetriesAndIgnoreErrors(
TVector<TFailedJobInfo> GetFailedJobInfo(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
const TGetFailedJobInfoOptions& options)
{
- const auto listJobsResult = ListJobs(
+ const auto listJobsResult = RequestWithRetry<TListJobsResult>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TListJobsOptions()
- .State(EJobState::Failed)
- .Limit(options.MaxJobCount_));
+ [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->ListJobs(
+ operationId,
+ TListJobsOptions()
+ .State(EJobState::Failed)
+ .Limit(options.MaxJobCount_));
+ });
const auto stderrTailSize = options.StderrTailSize_;
@@ -405,7 +404,7 @@ TVector<TFailedJobInfo> GetFailedJobInfo(
// so we ignore all errors and try our luck on other jobs.
info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
+ rawClient,
operationId,
*job.Id,
stderrTailSize);
@@ -427,15 +426,20 @@ struct TGetJobsStderrOptions
static TVector<TString> GetJobsStderr(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
- const TGetJobsStderrOptions& options = TGetJobsStderrOptions())
+ const TGetJobsStderrOptions& options = {})
{
- const auto listJobsResult = ListJobs(
+ const auto listJobsResult = RequestWithRetry<TListJobsResult>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TListJobsOptions().Limit(options.MaxJobCount_).WithStderr(true));
+ [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->ListJobs(
+ operationId,
+ TListJobsOptions()
+ .Limit(options.MaxJobCount_)
+ .WithStderr(true));
+ });
+
const auto stderrTailSize = options.StderrTailSize_;
TVector<TString> result;
for (const auto& job : listJobsResult.Jobs) {
@@ -447,7 +451,7 @@ static TVector<TString> GetJobsStderr(
// so we ignore all errors and try our luck on other jobs.
GetJobStderrWithRetriesAndIgnoreErrors(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
+ rawClient,
operationId,
*job.Id,
stderrTailSize)
@@ -554,7 +558,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
EOperationBriefState CheckOperation(
const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
const TOperationId& operationId)
{
auto attributes = RequestWithRetry<TOperationAttributes>(
@@ -580,7 +583,7 @@ EOperationBriefState CheckOperation(
auto failedJobInfoList = GetFailedJobInfo(
clientRetryPolicy,
- context,
+ rawClient,
operationId,
TGetFailedJobInfoOptions());
@@ -608,7 +611,7 @@ void WaitForOperation(
: context.Config->OperationTrackerPollPeriod;
while (true) {
- auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId);
+ auto status = CheckOperation(rawClient, clientRetryPolicy, operationId);
if (status == EOperationBriefState::Completed) {
YT_LOG_INFO("Operation %v completed (%v)",
operationId,
@@ -1901,9 +1904,9 @@ void ExecuteRawMapReduce(
YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
preparer->GetPreparationId());
TMapReduceOperationIo operationIo;
- operationIo.Inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
- operationIo.MapOutputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
- operationIo.Outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
+ operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
+ operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
+ operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
VerifyHasElements(operationIo.Inputs, "inputs");
VerifyHasElements(operationIo.Outputs, "outputs");
@@ -1950,8 +1953,8 @@ void ExecuteSort(
{
YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -1999,8 +2002,8 @@ void ExecuteMerge(
{
YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -2049,7 +2052,7 @@ void ExecuteErase(
{
YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto tablePath = CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
+ auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
.BeginMap().Item("spec").BeginMap()
@@ -2085,8 +2088,8 @@ void ExecuteRemoteCopy(
{
YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CreateOutputTable(*preparer, output);
@@ -2340,7 +2343,7 @@ public:
: OperationImpl_(std::move(operationImpl))
{ }
- void PrepareRequest(TRawBatchRequest* batchRequest) override
+ void PrepareRequest(NRawClient::TRawBatchRequest* batchRequest) override
{
auto filter = TOperationAttributeFilter()
.Add(EOperationAttribute::State)
@@ -2744,13 +2747,22 @@ void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParamete
TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, jobId, options);
+ auto result = RequestWithRetry<NYson::TYsonString>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &jobId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetJob(*Id_, jobId, options);
+ });
+ return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
}
TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ return RequestWithRetry<TListJobsResult>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->ListJobs(*Id_, options);
+ });
}
struct TAsyncFinishOperationsArgs
@@ -2816,7 +2828,7 @@ void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttribu
TVector<TFailedJobInfo> failedJobStderrInfo;
if (*attributes.BriefState == EOperationBriefState::Failed) {
try {
- failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, Context_, *Id_, TGetFailedJobInfoOptions());
+ failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, RawClient_, *Id_, TGetFailedJobInfoOptions());
} catch (const std::exception& e) {
additionalExceptionText = "Cannot get job stderrs: ";
additionalExceptionText += e.what();
@@ -2929,7 +2941,7 @@ void TOperation::OnStatusUpdated(const TString& newStatus)
TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options)
{
- return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetContext(), GetId(), options);
+ return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetRawClient(), GetId(), options);
}
EOperationBriefState TOperation::GetBriefState()
@@ -3051,7 +3063,7 @@ void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, co
TWaitProxy::Get()->WaitFuture(finishedFuture);
finishedFuture.GetValue();
if (context.Config->WriteStderrSuccessfulJobs) {
- auto stderrs = GetJobsStderr(retryPolicy, context, operation->GetId());
+ auto stderrs = GetJobsStderr(retryPolicy, client->GetRawClient(), operation->GetId());
for (const auto& jobStderr : stderrs) {
if (!jobStderr.empty()) {
Cerr << jobStderr << '\n';