diff options
| author | hiddenpath <[email protected]> | 2024-12-17 01:13:52 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-17 02:03:47 +0300 |
| commit | 5935906b0bfd05ea9cf84fc03e1b7d8befd2ff11 (patch) | |
| tree | 9f72ba6c791e8374a21699eb820767c9fbaa16a8 /yt/cpp | |
| parent | b570317a503ddb08ae344d96997c4ebb45002b8d (diff) | |
[yt/cpp/mapreduce] YT-23616: Move Job methods to THttpRawClient
commit_hash:bd11304f4147ff314372d4ab6049478143f60fd5
Diffstat (limited to 'yt/cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 31 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/file_reader.cpp | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/fwd.h | 16 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 106 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.h | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 31 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 181 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 34 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 186 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 49 |
10 files changed, 329 insertions, 311 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 66cacf6b456..3177ea9d6a5 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -53,8 +53,6 @@ #include <util/string/type.h> #include <util/system/env.h> -using namespace NYT::NDetail::NRawClient; - namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -711,7 +709,7 @@ IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId) EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId) { - return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, Context_, operationId); + return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, operationId); } void TClientBase::AbortOperation(const TOperationId& operationId) @@ -1138,7 +1136,7 @@ void TClient::InsertRows( THttpHeader header("PUT", "insert_rows"); header.SetInputFormat(TFormat::YsonBinary()); // TODO: use corresponding raw request - header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options)); + header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options)); auto body = NodeListToYsonString(rows); TRequestConfig config; @@ -1324,7 +1322,12 @@ TJobAttributes TClient::GetJob( const TGetJobOptions& options) { CheckShutdown(); - return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options); + auto result = RequestWithRetry<NYson::TYsonString>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &jobId, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetJob(operationId, jobId, options); + }); + return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf())); } TListJobsResult TClient::ListJobs( @@ -1332,7 +1335,11 @@ TListJobsResult TClient::ListJobs( const TListJobsOptions& options) { CheckShutdown(); - return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + return RequestWithRetry<TListJobsResult>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId /*mutationId*/) { + return RawClient_->ListJobs(operationId, options); + }); } IFileReaderPtr TClient::GetJobInput( @@ -1340,7 +1347,7 @@ IFileReaderPtr TClient::GetJobInput( const TGetJobInputOptions& options) { CheckShutdown(); - return NRawClient::GetJobInput(Context_, jobId, options); + return RawClient_->GetJobInput(jobId, options); } IFileReaderPtr TClient::GetJobFailContext( @@ -1349,7 +1356,7 @@ IFileReaderPtr TClient::GetJobFailContext( const TGetJobFailContextOptions& options) { CheckShutdown(); - return NRawClient::GetJobFailContext(Context_, operationId, jobId, options); + return RawClient_->GetJobFailContext(operationId, jobId, options); } IFileReaderPtr TClient::GetJobStderr( @@ -1358,7 +1365,7 @@ IFileReaderPtr TClient::GetJobStderr( const TGetJobStderrOptions& options) { CheckShutdown(); - return NRawClient::GetJobStderr(Context_, operationId, jobId, options); + return RawClient_->GetJobStderr(operationId, jobId, options); } std::vector<TJobTraceEvent> TClient::GetJobTrace( @@ -1366,7 +1373,11 @@ std::vector<TJobTraceEvent> TClient::GetJobTrace( const TGetJobTraceOptions& options) { CheckShutdown(); - return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options); + return RequestWithRetry<std::vector<TJobTraceEvent>>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &operationId, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetJobTrace(operationId, options); + }); } TNode::TListType TClient::SkyShareTable( diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 408650bda90..06463d0af24 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -98,7 +98,7 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len) if (!IsRetriable(e) || attempt == retryCount) { throw; } - NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); + TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); } catch (std::exception& e) { YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)", GetActiveRequestId(), @@ -112,7 +112,7 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len) if (attempt == retryCount) { throw; } - NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); + TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config)); } Input_ = nullptr; } diff --git a/yt/cpp/mapreduce/client/fwd.h b/yt/cpp/mapreduce/client/fwd.h deleted file mode 100644 index d4449d4ac10..00000000000 --- a/yt/cpp/mapreduce/client/fwd.h +++ /dev/null @@ -1,16 +0,0 @@ -#pragma once - -#include <util/generic/ptr.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -class TPingableTransaction; - -class TClient; -using TClientPtr = ::TIntrusivePtr<TClient>; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 9f8c14647a5..98ed246e0e3 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -54,8 +54,6 @@ namespace NYT { namespace NDetail { -using namespace NRawClient; - using ::ToString; //////////////////////////////////////////////////////////////////////////////// @@ -226,11 +224,11 @@ TStructuredJobTableList ApplyProtobufColumnFilters( return tableList; } - auto isDynamic = BatchTransform( + auto isDynamic = NRawClient::BatchTransform( CreateDefaultRequestRetryPolicy(preparer.GetContext().Config), preparer.GetContext(), tableList, - [&] (TRawBatchRequest& batch, const auto& table) { + [&] (NRawClient::TRawBatchRequest& batch, const auto& table) { return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); }); @@ -306,8 +304,8 @@ TSimpleOperationIo CreateSimpleOperationIo( } }; - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs()); - auto outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs()); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs()); + auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs()); VerifyHasElements(inputs, "input"); VerifyHasElements(outputs, "output"); @@ -349,20 +347,19 @@ TSimpleOperationIo CreateSimpleOperationIo( TString GetJobStderrWithRetriesAndIgnoreErrors( const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, const TJobId& jobId, const size_t stderrTailSize, - const TGetJobStderrOptions& options = TGetJobStderrOptions()) + const TGetJobStderrOptions& options = {}) { TString jobStderr; try { - jobStderr = GetJobStderrWithRetries( + jobStderr = RequestWithRetry<TString>( retryPolicy, - context, - operationId, - jobId, - options); + [&rawClient, &operationId, &jobId, &options] (TMutationId /*mutationId*/) { + return rawClient->GetJobStderrWithRetries(operationId, jobId, options); + }); } catch (const TErrorResponse& e) { YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)", operationId, @@ -377,17 +374,19 @@ TString GetJobStderrWithRetriesAndIgnoreErrors( TVector<TFailedJobInfo> GetFailedJobInfo( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, const TGetFailedJobInfoOptions& options) { - const auto listJobsResult = ListJobs( + const auto listJobsResult = RequestWithRetry<TListJobsResult>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TListJobsOptions() - .State(EJobState::Failed) - .Limit(options.MaxJobCount_)); + [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) { + return rawClient->ListJobs( + operationId, + TListJobsOptions() + .State(EJobState::Failed) + .Limit(options.MaxJobCount_)); + }); const auto stderrTailSize = options.StderrTailSize_; @@ -405,7 +404,7 @@ TVector<TFailedJobInfo> GetFailedJobInfo( // so we ignore all errors and try our luck on other jobs. info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, + rawClient, operationId, *job.Id, stderrTailSize); @@ -427,15 +426,20 @@ struct TGetJobsStderrOptions static TVector<TString> GetJobsStderr( const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TOperationId& operationId, - const TGetJobsStderrOptions& options = TGetJobsStderrOptions()) + const TGetJobsStderrOptions& options = {}) { - const auto listJobsResult = ListJobs( + const auto listJobsResult = RequestWithRetry<TListJobsResult>( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - operationId, - TListJobsOptions().Limit(options.MaxJobCount_).WithStderr(true)); + [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) { + return rawClient->ListJobs( + operationId, + TListJobsOptions() + .Limit(options.MaxJobCount_) + .WithStderr(true)); + }); + const auto stderrTailSize = options.StderrTailSize_; TVector<TString> result; for (const auto& job : listJobsResult.Jobs) { @@ -447,7 +451,7 @@ static TVector<TString> GetJobsStderr( // so we ignore all errors and try our luck on other jobs. GetJobStderrWithRetriesAndIgnoreErrors( clientRetryPolicy->CreatePolicyForGenericRequest(), - context, + rawClient, operationId, *job.Id, stderrTailSize) @@ -554,7 +558,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( EOperationBriefState CheckOperation( const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, const TOperationId& operationId) { auto attributes = RequestWithRetry<TOperationAttributes>( @@ -580,7 +583,7 @@ EOperationBriefState CheckOperation( auto failedJobInfoList = GetFailedJobInfo( clientRetryPolicy, - context, + rawClient, operationId, TGetFailedJobInfoOptions()); @@ -608,7 +611,7 @@ void WaitForOperation( : context.Config->OperationTrackerPollPeriod; while (true) { - auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId); + auto status = CheckOperation(rawClient, clientRetryPolicy, operationId); if (status == EOperationBriefState::Completed) { YT_LOG_INFO("Operation %v completed (%v)", operationId, @@ -1901,9 +1904,9 @@ void ExecuteRawMapReduce( YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)", preparer->GetPreparationId()); TMapReduceOperationIo operationIo; - operationIo.Inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs()); - operationIo.MapOutputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs()); - operationIo.Outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs()); + operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs()); + operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs()); + operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs()); VerifyHasElements(operationIo.Inputs, "inputs"); VerifyHasElements(operationIo.Outputs, "outputs"); @@ -1950,8 +1953,8 @@ void ExecuteSort( { YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -1999,8 +2002,8 @@ void ExecuteMerge( { YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -2049,7 +2052,7 @@ void ExecuteErase( { YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)", preparer->GetPreparationId()); - auto tablePath = CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); + auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() .BeginMap().Item("spec").BeginMap() @@ -2085,8 +2088,8 @@ void ExecuteRemoteCopy( { YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); if (options.CreateOutputTables_) { CreateOutputTable(*preparer, output); @@ -2340,7 +2343,7 @@ public: : OperationImpl_(std::move(operationImpl)) { } - void PrepareRequest(TRawBatchRequest* batchRequest) override + void PrepareRequest(NRawClient::TRawBatchRequest* batchRequest) override { auto filter = TOperationAttributeFilter() .Add(EOperationAttribute::State) @@ -2744,13 +2747,22 @@ void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParamete TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, jobId, options); + auto result = RequestWithRetry<NYson::TYsonString>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &jobId, &options] (TMutationId /*mutationId*/) { + return RawClient_->GetJob(*Id_, jobId, options); + }); + return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf())); } TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options) { ValidateOperationStarted(); - return NYT::NDetail::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options); + return RequestWithRetry<TListJobsResult>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &options] (TMutationId /*mutationId*/) { + return RawClient_->ListJobs(*Id_, options); + }); } struct TAsyncFinishOperationsArgs @@ -2816,7 +2828,7 @@ void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttribu TVector<TFailedJobInfo> failedJobStderrInfo; if (*attributes.BriefState == EOperationBriefState::Failed) { try { - failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, Context_, *Id_, TGetFailedJobInfoOptions()); + failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, RawClient_, *Id_, TGetFailedJobInfoOptions()); } catch (const std::exception& e) { additionalExceptionText = "Cannot get job stderrs: "; additionalExceptionText += e.what(); @@ -2929,7 +2941,7 @@ void TOperation::OnStatusUpdated(const TString& newStatus) TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options) { - return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetContext(), GetId(), options); + return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetRawClient(), GetId(), options); } EOperationBriefState TOperation::GetBriefState() @@ -3051,7 +3063,7 @@ void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, co TWaitProxy::Get()->WaitFuture(finishedFuture); finishedFuture.GetValue(); if (context.Config->WriteStderrSuccessfulJobs) { - auto stderrs = GetJobsStderr(retryPolicy, context, operation->GetId()); + auto stderrs = GetJobsStderr(retryPolicy, client->GetRawClient(), operation->GetId()); for (const auto& jobStderr : stderrs) { if (!jobStderr.empty()) { Cerr << jobStderr << '\n'; diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h index e5e675a1f48..06eeac492fc 100644 --- a/yt/cpp/mapreduce/client/operation.h +++ b/yt/cpp/mapreduce/client/operation.h @@ -1,6 +1,5 @@ #pragma once -#include "fwd.h" #include "structured_table_formats.h" #include "operation_preparer.h" @@ -181,7 +180,6 @@ void ExecuteVanilla( EOperationBriefState CheckOperation( const IRawClientPtr& rawClient, const IClientRetryPolicyPtr& clientRetryPolicy, - const TClientContext& context, const TOperationId& operationId); void WaitForOperation( diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 2229486a014..d21b3a937a8 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -150,6 +150,37 @@ public: const TOperationId& operationId, const TUpdateOperationParametersOptions& options = {}) = 0; + virtual NYson::TYsonString GetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options = {}) = 0; + + virtual TListJobsResult ListJobs( + const TOperationId& operationId, + const TListJobsOptions& options = {}) = 0; + + virtual IFileReaderPtr GetJobInput( + const TJobId& jobId, + const TGetJobInputOptions& options = {}) = 0; + + virtual IFileReaderPtr GetJobFailContext( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& options = {}) = 0; + + virtual TString GetJobStderrWithRetries( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = {}) = 0; + + virtual IFileReaderPtr GetJobStderr( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = {}) = 0; + + virtual std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = {}) = 0; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 7bc83b8ebc2..d95a513545d 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -5,11 +5,13 @@ #include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/http/helpers.h> #include <yt/cpp/mapreduce/http/http.h> #include <yt/cpp/mapreduce/http/requests.h> #include <yt/cpp/mapreduce/http/retry_request.h> #include <yt/cpp/mapreduce/interface/operation.h> +#include <yt/cpp/mapreduce/interface/tvm.h> #include <library/cpp/yson/node/node_io.h> @@ -99,7 +101,6 @@ TNodeId THttpRawClient::Create( return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response); } - TNodeId THttpRawClient::CopyWithoutRetries( const TTransactionId& transactionId, const TYPath& sourcePath, @@ -375,6 +376,184 @@ void THttpRawClient::UpdateOperationParameters( RequestWithoutRetry(Context_, mutationId, header); } +NYson::TYsonString THttpRawClient::GetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_job"); + header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options)); + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); + return NYson::TYsonString(responseInfo.Response); +} + +TListJobsResult THttpRawClient::ListJobs( + const TOperationId& operationId, + const TListJobsOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "list_jobs"); + header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options)); + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + const auto& jobNodesList = resultNode["jobs"].AsList(); + + TListJobsResult result; + result.Jobs.reserve(jobNodesList.size()); + for (const auto& jobNode : jobNodesList) { + result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode)); + } + + if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) { + result.CypressJobCount = resultNode["cypress_job_count"].AsInt64(); + } + if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) { + result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64(); + } + if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) { + result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64(); + } + + return result; +} + +class TResponseReader + : public IFileReader +{ +public: + TResponseReader(const TClientContext& context, THttpHeader header) + { + if (context.ServiceTicketAuth) { + header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); + } else { + header.SetToken(context.Token); + } + + if (context.ImpersonationUser) { + header.SetImpersonationUser(*context.ImpersonationUser); + } + + auto hostName = GetProxyForHeavyRequest(context); + auto requestId = CreateGuidAsString(); + + UpdateHeaderForProxyIfNeed(hostName, context, header); + + Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); + ResponseStream_ = Response_->GetResponseStream(); + } + +private: + size_t DoRead(void* buf, size_t len) override + { + return ResponseStream_->Read(buf, len); + } + + size_t DoSkip(size_t len) override + { + return ResponseStream_->Skip(len); + } + +private: + NHttpClient::IHttpResponsePtr Response_; + IInputStream* ResponseStream_; +}; + +IFileReaderPtr THttpRawClient::GetJobInput( + const TJobId& jobId, + const TGetJobInputOptions& /*options*/) +{ + THttpHeader header("GET", "get_job_input"); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(Context_, std::move(header)); +} + +IFileReaderPtr THttpRawClient::GetJobFailContext( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& /*options*/) +{ + THttpHeader header("GET", "get_job_fail_context"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(Context_, std::move(header)); +} + +TString THttpRawClient::GetJobStderrWithRetries( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& /*options*/) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_job_stderr"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + TRequestConfig config; + config.IsHeavy = true; + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, {}, config); + return responseInfo.Response; +} + +IFileReaderPtr THttpRawClient::GetJobStderr( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& /*options*/) +{ + THttpHeader header("GET", "get_job_stderr"); + header.AddOperationId(operationId); + header.AddParameter("job_id", GetGuidAsString(jobId)); + return new TResponseReader(Context_, std::move(header)); +} + +TJobTraceEvent ParseJobTraceEvent(const TNode& node) +{ + const auto& mapNode = node.AsMap(); + TJobTraceEvent result; + + if (auto idNode = mapNode.FindPtr("operation_id")) { + result.OperationId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("job_id")) { + result.JobId = GetGuid(idNode->AsString()); + } + if (auto idNode = mapNode.FindPtr("trace_id")) { + result.TraceId = GetGuid(idNode->AsString()); + } + if (auto eventIndexNode = mapNode.FindPtr("event_index")) { + result.EventIndex = eventIndexNode->AsInt64(); + } + if (auto eventNode = mapNode.FindPtr("event")) { + result.Event = eventNode->AsString(); + } + if (auto eventTimeNode = mapNode.FindPtr("event_time")) { + result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());; + } + + return result; +} + +std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options) +{ + TMutationId mutationId; + THttpHeader header("GET", "get_job_trace"); + header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options)); + auto responseInfo = RequestWithoutRetry(Context_, mutationId, header); + auto resultNode = NodeFromYsonString(responseInfo.Response); + + const auto& traceEventNodesList = resultNode.AsList(); + + std::vector<TJobTraceEvent> result; + result.reserve(traceEventNodesList.size()); + for (const auto& traceEventNode : traceEventNodesList) { + result.push_back(ParseJobTraceEvent(traceEventNode)); + } + + return result; +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index c3771497fba..77f10c1c651 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -154,6 +154,40 @@ public: const TOperationId& operationId, const TUpdateOperationParametersOptions& options = {}) override; + // Jobs + + NYson::TYsonString GetJob( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobOptions& options = {}) override; + + TListJobsResult ListJobs( + const TOperationId& operationId, + const TListJobsOptions& options = {}) override; + + IFileReaderPtr GetJobInput( + const TJobId& jobId, + const TGetJobInputOptions& options = {}) override; + + IFileReaderPtr GetJobFailContext( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobFailContextOptions& options = {}) override; + + TString GetJobStderrWithRetries( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = {}) override; + + IFileReaderPtr GetJobStderr( + const TOperationId& operationId, + const TJobId& jobId, + const TGetJobStderrOptions& options = {}) override; + + std::vector<TJobTraceEvent> GetJobTrace( + const TOperationId& operationId, + const TGetJobTraceOptions& options = {}) override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index bb69728cb2e..31f09339756 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -269,192 +269,6 @@ TJobAttributes ParseJobAttributes(const TNode& node) return result; } -TJobAttributes GetJob( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobOptions& options) -{ - THttpHeader header("GET", "get_job"); - header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options)); - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); - return ParseJobAttributes(resultNode); -} - -TListJobsResult ListJobs( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TListJobsOptions& options) -{ - THttpHeader header("GET", "list_jobs"); - header.MergeParameters(SerializeParamsForListJobs(operationId, options)); - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); - - TListJobsResult result; - - const auto& jobNodesList = resultNode["jobs"].AsList(); - result.Jobs.reserve(jobNodesList.size()); - for (const auto& jobNode : jobNodesList) { - result.Jobs.push_back(ParseJobAttributes(jobNode)); - } - - if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) { - result.CypressJobCount = resultNode["cypress_job_count"].AsInt64(); - } - if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) { - result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64(); - } - if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) { - result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64(); - } - - return result; -} - -class TResponseReader - : public IFileReader -{ -public: - TResponseReader(const TClientContext& context, THttpHeader header) - { - if (context.ServiceTicketAuth) { - header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket()); - } else { - header.SetToken(context.Token); - } - - if (context.ImpersonationUser) { - header.SetImpersonationUser(*context.ImpersonationUser); - } - - auto hostName = GetProxyForHeavyRequest(context); - auto requestId = CreateGuidAsString(); - - UpdateHeaderForProxyIfNeed(hostName, context, header); - - Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header); - ResponseStream_ = Response_->GetResponseStream(); - } - -private: - size_t DoRead(void* buf, size_t len) override - { - return ResponseStream_->Read(buf, len); - } - - size_t DoSkip(size_t len) override - { - return ResponseStream_->Skip(len); - } - -private: - NHttpClient::IHttpResponsePtr Response_; - IInputStream* ResponseStream_; -}; - -IFileReaderPtr GetJobInput( - const TClientContext& context, - const TJobId& jobId, - const TGetJobInputOptions& /* options */) -{ - THttpHeader header("GET", "get_job_input"); - header.AddParameter("job_id", GetGuidAsString(jobId)); - return new TResponseReader(context, std::move(header)); -} - -IFileReaderPtr GetJobFailContext( - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobFailContextOptions& /* options */) -{ - THttpHeader header("GET", "get_job_fail_context"); - header.AddOperationId(operationId); - header.AddParameter("job_id", GetGuidAsString(jobId)); - return new TResponseReader(context, std::move(header)); -} - -TString GetJobStderrWithRetries( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobStderrOptions& /* options */) -{ - THttpHeader header("GET", "get_job_stderr"); - header.AddOperationId(operationId); - header.AddParameter("job_id", GetGuidAsString(jobId)); - TRequestConfig config; - config.IsHeavy = true; - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config); - return responseInfo.Response; -} - -IFileReaderPtr GetJobStderr( - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobStderrOptions& /* options */) -{ - THttpHeader header("GET", "get_job_stderr"); - header.AddOperationId(operationId); - header.AddParameter("job_id", GetGuidAsString(jobId)); - return new TResponseReader(context, std::move(header)); -} - -TJobTraceEvent ParseJobTraceEvent(const TNode& node) -{ - const auto& mapNode = node.AsMap(); - TJobTraceEvent result; - - if (auto idNode = mapNode.FindPtr("operation_id")) { - result.OperationId = GetGuid(idNode->AsString()); - } - if (auto idNode = mapNode.FindPtr("job_id")) { - result.JobId = GetGuid(idNode->AsString()); - } - if (auto idNode = mapNode.FindPtr("trace_id")) { - result.TraceId = GetGuid(idNode->AsString()); - } - if (auto eventIndexNode = mapNode.FindPtr("event_index")) { - result.EventIndex = eventIndexNode->AsInt64(); - } - if (auto eventNode = mapNode.FindPtr("event")) { - result.Event = eventNode->AsString(); - } - if (auto eventTimeNode = mapNode.FindPtr("event_time")) { - result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());; - } - - return result; -} - -std::vector<TJobTraceEvent> GetJobTrace( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetJobTraceOptions& options) -{ - THttpHeader header("GET", "get_job_trace"); - header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options)); - auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header); - auto resultNode = NodeFromYsonString(responseInfo.Response); - - std::vector<TJobTraceEvent> result; - - const auto& traceEventNodesList = resultNode.AsList(); - result.reserve(traceEventNodesList.size()); - for (const auto& traceEventNode : traceEventNodesList) { - result.push_back(ParseJobTraceEvent(traceEventNode)); - } - - return result; -} - TMaybe<TYPath> GetFileFromCache( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 28af3ee9896..494f145fae4 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -23,6 +23,8 @@ namespace NDetail::NRawClient { TOperationAttributes ParseOperationAttributes(const TNode& node); +TJobAttributes ParseJobAttributes(const TNode& node); + TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// @@ -36,53 +38,6 @@ void ExecuteBatch( const TExecuteBatchOptions& options = TExecuteBatchOptions()); // -// Jobs -// - -TJobAttributes GetJob( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobOptions& options = TGetJobOptions()); - -TListJobsResult ListJobs( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TListJobsOptions& options = TListJobsOptions()); - -::TIntrusivePtr<IFileReader> GetJobInput( - const TClientContext& context, - const TJobId& jobId, - const TGetJobInputOptions& options = TGetJobInputOptions()); - -::TIntrusivePtr<IFileReader> GetJobFailContext( - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobFailContextOptions& options = TGetJobFailContextOptions()); - -TString GetJobStderrWithRetries( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobStderrOptions& /* options */ = TGetJobStderrOptions()); - -::TIntrusivePtr<IFileReader> GetJobStderr( - const TClientContext& context, - const TOperationId& operationId, - const TJobId& jobId, - const TGetJobStderrOptions& options = TGetJobStderrOptions()); - -std::vector<TJobTraceEvent> GetJobTrace( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TOperationId& operationId, - const TGetJobTraceOptions& options = TGetJobTraceOptions()); - -// // File cache // |
