summaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-17 01:13:52 +0300
committerhiddenpath <[email protected]>2024-12-17 02:03:47 +0300
commit5935906b0bfd05ea9cf84fc03e1b7d8befd2ff11 (patch)
tree9f72ba6c791e8374a21699eb820767c9fbaa16a8 /yt/cpp
parentb570317a503ddb08ae344d96997c4ebb45002b8d (diff)
[yt/cpp/mapreduce] YT-23616: Move Job methods to THttpRawClient
commit_hash:bd11304f4147ff314372d4ab6049478143f60fd5
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/client/client.cpp31
-rw-r--r--yt/cpp/mapreduce/client/file_reader.cpp4
-rw-r--r--yt/cpp/mapreduce/client/fwd.h16
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp106
-rw-r--r--yt/cpp/mapreduce/client/operation.h2
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h31
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp181
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h34
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp186
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h49
10 files changed, 329 insertions, 311 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 66cacf6b456..3177ea9d6a5 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -53,8 +53,6 @@
#include <util/string/type.h>
#include <util/system/env.h>
-using namespace NYT::NDetail::NRawClient;
-
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
@@ -711,7 +709,7 @@ IOperationPtr TClientBase::AttachOperation(const TOperationId& operationId)
EOperationBriefState TClientBase::CheckOperation(const TOperationId& operationId)
{
- return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, Context_, operationId);
+ return NYT::NDetail::CheckOperation(RawClient_, ClientRetryPolicy_, operationId);
}
void TClientBase::AbortOperation(const TOperationId& operationId)
@@ -1138,7 +1136,7 @@ void TClient::InsertRows(
THttpHeader header("PUT", "insert_rows");
header.SetInputFormat(TFormat::YsonBinary());
// TODO: use corresponding raw request
- header.MergeParameters(SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
+ header.MergeParameters(NRawClient::SerializeParametersForInsertRows(Context_.Config->Prefix, path, options));
auto body = NodeListToYsonString(rows);
TRequestConfig config;
@@ -1324,7 +1322,12 @@ TJobAttributes TClient::GetJob(
const TGetJobOptions& options)
{
CheckShutdown();
- return NRawClient::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, jobId, options);
+ auto result = RequestWithRetry<NYson::TYsonString>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetJob(operationId, jobId, options);
+ });
+ return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
}
TListJobsResult TClient::ListJobs(
@@ -1332,7 +1335,11 @@ TListJobsResult TClient::ListJobs(
const TListJobsOptions& options)
{
CheckShutdown();
- return NRawClient::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ return RequestWithRetry<TListJobsResult>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->ListJobs(operationId, options);
+ });
}
IFileReaderPtr TClient::GetJobInput(
@@ -1340,7 +1347,7 @@ IFileReaderPtr TClient::GetJobInput(
const TGetJobInputOptions& options)
{
CheckShutdown();
- return NRawClient::GetJobInput(Context_, jobId, options);
+ return RawClient_->GetJobInput(jobId, options);
}
IFileReaderPtr TClient::GetJobFailContext(
@@ -1349,7 +1356,7 @@ IFileReaderPtr TClient::GetJobFailContext(
const TGetJobFailContextOptions& options)
{
CheckShutdown();
- return NRawClient::GetJobFailContext(Context_, operationId, jobId, options);
+ return RawClient_->GetJobFailContext(operationId, jobId, options);
}
IFileReaderPtr TClient::GetJobStderr(
@@ -1358,7 +1365,7 @@ IFileReaderPtr TClient::GetJobStderr(
const TGetJobStderrOptions& options)
{
CheckShutdown();
- return NRawClient::GetJobStderr(Context_, operationId, jobId, options);
+ return RawClient_->GetJobStderr(operationId, jobId, options);
}
std::vector<TJobTraceEvent> TClient::GetJobTrace(
@@ -1366,7 +1373,11 @@ std::vector<TJobTraceEvent> TClient::GetJobTrace(
const TGetJobTraceOptions& options)
{
CheckShutdown();
- return NRawClient::GetJobTrace(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, operationId, options);
+ return RequestWithRetry<std::vector<TJobTraceEvent>>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &operationId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetJobTrace(operationId, options);
+ });
}
TNode::TListType TClient::SkyShareTable(
diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp
index 408650bda90..06463d0af24 100644
--- a/yt/cpp/mapreduce/client/file_reader.cpp
+++ b/yt/cpp/mapreduce/client/file_reader.cpp
@@ -98,7 +98,7 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len)
if (!IsRetriable(e) || attempt == retryCount) {
throw;
}
- NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
+ TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
} catch (std::exception& e) {
YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
GetActiveRequestId(),
@@ -112,7 +112,7 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len)
if (attempt == retryCount) {
throw;
}
- NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
+ TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
}
Input_ = nullptr;
}
diff --git a/yt/cpp/mapreduce/client/fwd.h b/yt/cpp/mapreduce/client/fwd.h
deleted file mode 100644
index d4449d4ac10..00000000000
--- a/yt/cpp/mapreduce/client/fwd.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#pragma once
-
-#include <util/generic/ptr.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TPingableTransaction;
-
-class TClient;
-using TClientPtr = ::TIntrusivePtr<TClient>;
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 9f8c14647a5..98ed246e0e3 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -54,8 +54,6 @@
namespace NYT {
namespace NDetail {
-using namespace NRawClient;
-
using ::ToString;
////////////////////////////////////////////////////////////////////////////////
@@ -226,11 +224,11 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
return tableList;
}
- auto isDynamic = BatchTransform(
+ auto isDynamic = NRawClient::BatchTransform(
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
preparer.GetContext(),
tableList,
- [&] (TRawBatchRequest& batch, const auto& table) {
+ [&] (NRawClient::TRawBatchRequest& batch, const auto& table) {
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
});
@@ -306,8 +304,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
}
};
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
- auto outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
+ auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
VerifyHasElements(inputs, "input");
VerifyHasElements(outputs, "output");
@@ -349,20 +347,19 @@ TSimpleOperationIo CreateSimpleOperationIo(
TString GetJobStderrWithRetriesAndIgnoreErrors(
const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
const TJobId& jobId,
const size_t stderrTailSize,
- const TGetJobStderrOptions& options = TGetJobStderrOptions())
+ const TGetJobStderrOptions& options = {})
{
TString jobStderr;
try {
- jobStderr = GetJobStderrWithRetries(
+ jobStderr = RequestWithRetry<TString>(
retryPolicy,
- context,
- operationId,
- jobId,
- options);
+ [&rawClient, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->GetJobStderrWithRetries(operationId, jobId, options);
+ });
} catch (const TErrorResponse& e) {
YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)",
operationId,
@@ -377,17 +374,19 @@ TString GetJobStderrWithRetriesAndIgnoreErrors(
TVector<TFailedJobInfo> GetFailedJobInfo(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
const TGetFailedJobInfoOptions& options)
{
- const auto listJobsResult = ListJobs(
+ const auto listJobsResult = RequestWithRetry<TListJobsResult>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TListJobsOptions()
- .State(EJobState::Failed)
- .Limit(options.MaxJobCount_));
+ [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->ListJobs(
+ operationId,
+ TListJobsOptions()
+ .State(EJobState::Failed)
+ .Limit(options.MaxJobCount_));
+ });
const auto stderrTailSize = options.StderrTailSize_;
@@ -405,7 +404,7 @@ TVector<TFailedJobInfo> GetFailedJobInfo(
// so we ignore all errors and try our luck on other jobs.
info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
+ rawClient,
operationId,
*job.Id,
stderrTailSize);
@@ -427,15 +426,20 @@ struct TGetJobsStderrOptions
static TVector<TString> GetJobsStderr(
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TOperationId& operationId,
- const TGetJobsStderrOptions& options = TGetJobsStderrOptions())
+ const TGetJobsStderrOptions& options = {})
{
- const auto listJobsResult = ListJobs(
+ const auto listJobsResult = RequestWithRetry<TListJobsResult>(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- operationId,
- TListJobsOptions().Limit(options.MaxJobCount_).WithStderr(true));
+ [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
+ return rawClient->ListJobs(
+ operationId,
+ TListJobsOptions()
+ .Limit(options.MaxJobCount_)
+ .WithStderr(true));
+ });
+
const auto stderrTailSize = options.StderrTailSize_;
TVector<TString> result;
for (const auto& job : listJobsResult.Jobs) {
@@ -447,7 +451,7 @@ static TVector<TString> GetJobsStderr(
// so we ignore all errors and try our luck on other jobs.
GetJobStderrWithRetriesAndIgnoreErrors(
clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
+ rawClient,
operationId,
*job.Id,
stderrTailSize)
@@ -554,7 +558,6 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
EOperationBriefState CheckOperation(
const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
const TOperationId& operationId)
{
auto attributes = RequestWithRetry<TOperationAttributes>(
@@ -580,7 +583,7 @@ EOperationBriefState CheckOperation(
auto failedJobInfoList = GetFailedJobInfo(
clientRetryPolicy,
- context,
+ rawClient,
operationId,
TGetFailedJobInfoOptions());
@@ -608,7 +611,7 @@ void WaitForOperation(
: context.Config->OperationTrackerPollPeriod;
while (true) {
- auto status = CheckOperation(rawClient, clientRetryPolicy, context, operationId);
+ auto status = CheckOperation(rawClient, clientRetryPolicy, operationId);
if (status == EOperationBriefState::Completed) {
YT_LOG_INFO("Operation %v completed (%v)",
operationId,
@@ -1901,9 +1904,9 @@ void ExecuteRawMapReduce(
YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
preparer->GetPreparationId());
TMapReduceOperationIo operationIo;
- operationIo.Inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
- operationIo.MapOutputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
- operationIo.Outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
+ operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
+ operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
+ operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
VerifyHasElements(operationIo.Inputs, "inputs");
VerifyHasElements(operationIo.Outputs, "outputs");
@@ -1950,8 +1953,8 @@ void ExecuteSort(
{
YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -1999,8 +2002,8 @@ void ExecuteMerge(
{
YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -2049,7 +2052,7 @@ void ExecuteErase(
{
YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto tablePath = CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
+ auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
.BeginMap().Item("spec").BeginMap()
@@ -2085,8 +2088,8 @@ void ExecuteRemoteCopy(
{
YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
if (options.CreateOutputTables_) {
CreateOutputTable(*preparer, output);
@@ -2340,7 +2343,7 @@ public:
: OperationImpl_(std::move(operationImpl))
{ }
- void PrepareRequest(TRawBatchRequest* batchRequest) override
+ void PrepareRequest(NRawClient::TRawBatchRequest* batchRequest) override
{
auto filter = TOperationAttributeFilter()
.Add(EOperationAttribute::State)
@@ -2744,13 +2747,22 @@ void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParamete
TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, jobId, options);
+ auto result = RequestWithRetry<NYson::TYsonString>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &jobId, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->GetJob(*Id_, jobId, options);
+ });
+ return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
}
TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options)
{
ValidateOperationStarted();
- return NYT::NDetail::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
+ return RequestWithRetry<TListJobsResult>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &options] (TMutationId /*mutationId*/) {
+ return RawClient_->ListJobs(*Id_, options);
+ });
}
struct TAsyncFinishOperationsArgs
@@ -2816,7 +2828,7 @@ void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttribu
TVector<TFailedJobInfo> failedJobStderrInfo;
if (*attributes.BriefState == EOperationBriefState::Failed) {
try {
- failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, Context_, *Id_, TGetFailedJobInfoOptions());
+ failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, RawClient_, *Id_, TGetFailedJobInfoOptions());
} catch (const std::exception& e) {
additionalExceptionText = "Cannot get job stderrs: ";
additionalExceptionText += e.what();
@@ -2929,7 +2941,7 @@ void TOperation::OnStatusUpdated(const TString& newStatus)
TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options)
{
- return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetContext(), GetId(), options);
+ return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetRawClient(), GetId(), options);
}
EOperationBriefState TOperation::GetBriefState()
@@ -3051,7 +3063,7 @@ void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, co
TWaitProxy::Get()->WaitFuture(finishedFuture);
finishedFuture.GetValue();
if (context.Config->WriteStderrSuccessfulJobs) {
- auto stderrs = GetJobsStderr(retryPolicy, context, operation->GetId());
+ auto stderrs = GetJobsStderr(retryPolicy, client->GetRawClient(), operation->GetId());
for (const auto& jobStderr : stderrs) {
if (!jobStderr.empty()) {
Cerr << jobStderr << '\n';
diff --git a/yt/cpp/mapreduce/client/operation.h b/yt/cpp/mapreduce/client/operation.h
index e5e675a1f48..06eeac492fc 100644
--- a/yt/cpp/mapreduce/client/operation.h
+++ b/yt/cpp/mapreduce/client/operation.h
@@ -1,6 +1,5 @@
#pragma once
-#include "fwd.h"
#include "structured_table_formats.h"
#include "operation_preparer.h"
@@ -181,7 +180,6 @@ void ExecuteVanilla(
EOperationBriefState CheckOperation(
const IRawClientPtr& rawClient,
const IClientRetryPolicyPtr& clientRetryPolicy,
- const TClientContext& context,
const TOperationId& operationId);
void WaitForOperation(
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 2229486a014..d21b3a937a8 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -150,6 +150,37 @@ public:
const TOperationId& operationId,
const TUpdateOperationParametersOptions& options = {}) = 0;
+ virtual NYson::TYsonString GetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options = {}) = 0;
+
+ virtual TListJobsResult ListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options = {}) = 0;
+
+ virtual IFileReaderPtr GetJobInput(
+ const TJobId& jobId,
+ const TGetJobInputOptions& options = {}) = 0;
+
+ virtual IFileReaderPtr GetJobFailContext(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& options = {}) = 0;
+
+ virtual TString GetJobStderrWithRetries(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& options = {}) = 0;
+
+ virtual IFileReaderPtr GetJobStderr(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& options = {}) = 0;
+
+ virtual std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = {}) = 0;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 7bc83b8ebc2..d95a513545d 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -5,11 +5,13 @@
#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/http/helpers.h>
#include <yt/cpp/mapreduce/http/http.h>
#include <yt/cpp/mapreduce/http/requests.h>
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/tvm.h>
#include <library/cpp/yson/node/node_io.h>
@@ -99,7 +101,6 @@ TNodeId THttpRawClient::Create(
return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header).Response);
}
-
TNodeId THttpRawClient::CopyWithoutRetries(
const TTransactionId& transactionId,
const TYPath& sourcePath,
@@ -375,6 +376,184 @@ void THttpRawClient::UpdateOperationParameters(
RequestWithoutRetry(Context_, mutationId, header);
}
+NYson::TYsonString THttpRawClient::GetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job");
+ header.MergeParameters(NRawClient::SerializeParamsForGetJob(operationId, jobId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ return NYson::TYsonString(responseInfo.Response);
+}
+
+TListJobsResult THttpRawClient::ListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "list_jobs");
+ header.MergeParameters(NRawClient::SerializeParamsForListJobs(operationId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ const auto& jobNodesList = resultNode["jobs"].AsList();
+
+ TListJobsResult result;
+ result.Jobs.reserve(jobNodesList.size());
+ for (const auto& jobNode : jobNodesList) {
+ result.Jobs.push_back(NRawClient::ParseJobAttributes(jobNode));
+ }
+
+ if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
+ result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
+ result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
+ }
+ if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
+ result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
+ }
+
+ return result;
+}
+
+class TResponseReader
+ : public IFileReader
+{
+public:
+ TResponseReader(const TClientContext& context, THttpHeader header)
+ {
+ if (context.ServiceTicketAuth) {
+ header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ } else {
+ header.SetToken(context.Token);
+ }
+
+ if (context.ImpersonationUser) {
+ header.SetImpersonationUser(*context.ImpersonationUser);
+ }
+
+ auto hostName = GetProxyForHeavyRequest(context);
+ auto requestId = CreateGuidAsString();
+
+ UpdateHeaderForProxyIfNeed(hostName, context, header);
+
+ Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
+ ResponseStream_ = Response_->GetResponseStream();
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return ResponseStream_->Read(buf, len);
+ }
+
+ size_t DoSkip(size_t len) override
+ {
+ return ResponseStream_->Skip(len);
+ }
+
+private:
+ NHttpClient::IHttpResponsePtr Response_;
+ IInputStream* ResponseStream_;
+};
+
+IFileReaderPtr THttpRawClient::GetJobInput(
+ const TJobId& jobId,
+ const TGetJobInputOptions& /*options*/)
+{
+ THttpHeader header("GET", "get_job_input");
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(Context_, std::move(header));
+}
+
+IFileReaderPtr THttpRawClient::GetJobFailContext(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& /*options*/)
+{
+ THttpHeader header("GET", "get_job_fail_context");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(Context_, std::move(header));
+}
+
+TString THttpRawClient::GetJobStderrWithRetries(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /*options*/)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_stderr");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, {}, config);
+ return responseInfo.Response;
+}
+
+IFileReaderPtr THttpRawClient::GetJobStderr(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& /*options*/)
+{
+ THttpHeader header("GET", "get_job_stderr");
+ header.AddOperationId(operationId);
+ header.AddParameter("job_id", GetGuidAsString(jobId));
+ return new TResponseReader(Context_, std::move(header));
+}
+
+TJobTraceEvent ParseJobTraceEvent(const TNode& node)
+{
+ const auto& mapNode = node.AsMap();
+ TJobTraceEvent result;
+
+ if (auto idNode = mapNode.FindPtr("operation_id")) {
+ result.OperationId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("job_id")) {
+ result.JobId = GetGuid(idNode->AsString());
+ }
+ if (auto idNode = mapNode.FindPtr("trace_id")) {
+ result.TraceId = GetGuid(idNode->AsString());
+ }
+ if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
+ result.EventIndex = eventIndexNode->AsInt64();
+ }
+ if (auto eventNode = mapNode.FindPtr("event")) {
+ result.Event = eventNode->AsString();
+ }
+ if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
+ result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
+ }
+
+ return result;
+}
+
+std::vector<TJobTraceEvent> THttpRawClient::GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options)
+{
+ TMutationId mutationId;
+ THttpHeader header("GET", "get_job_trace");
+ header.MergeParameters(NRawClient::SerializeParamsForGetJobTrace(operationId, options));
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header);
+ auto resultNode = NodeFromYsonString(responseInfo.Response);
+
+ const auto& traceEventNodesList = resultNode.AsList();
+
+ std::vector<TJobTraceEvent> result;
+ result.reserve(traceEventNodesList.size());
+ for (const auto& traceEventNode : traceEventNodesList) {
+ result.push_back(ParseJobTraceEvent(traceEventNode));
+ }
+
+ return result;
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index c3771497fba..77f10c1c651 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -154,6 +154,40 @@ public:
const TOperationId& operationId,
const TUpdateOperationParametersOptions& options = {}) override;
+ // Jobs
+
+ NYson::TYsonString GetJob(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobOptions& options = {}) override;
+
+ TListJobsResult ListJobs(
+ const TOperationId& operationId,
+ const TListJobsOptions& options = {}) override;
+
+ IFileReaderPtr GetJobInput(
+ const TJobId& jobId,
+ const TGetJobInputOptions& options = {}) override;
+
+ IFileReaderPtr GetJobFailContext(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobFailContextOptions& options = {}) override;
+
+ TString GetJobStderrWithRetries(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& options = {}) override;
+
+ IFileReaderPtr GetJobStderr(
+ const TOperationId& operationId,
+ const TJobId& jobId,
+ const TGetJobStderrOptions& options = {}) override;
+
+ std::vector<TJobTraceEvent> GetJobTrace(
+ const TOperationId& operationId,
+ const TGetJobTraceOptions& options = {}) override;
+
private:
const TClientContext Context_;
};
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index bb69728cb2e..31f09339756 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -269,192 +269,6 @@ TJobAttributes ParseJobAttributes(const TNode& node)
return result;
}
-TJobAttributes GetJob(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobOptions& options)
-{
- THttpHeader header("GET", "get_job");
- header.MergeParameters(SerializeParamsForGetJob(operationId, jobId, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
- return ParseJobAttributes(resultNode);
-}
-
-TListJobsResult ListJobs(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TListJobsOptions& options)
-{
- THttpHeader header("GET", "list_jobs");
- header.MergeParameters(SerializeParamsForListJobs(operationId, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
-
- TListJobsResult result;
-
- const auto& jobNodesList = resultNode["jobs"].AsList();
- result.Jobs.reserve(jobNodesList.size());
- for (const auto& jobNode : jobNodesList) {
- result.Jobs.push_back(ParseJobAttributes(jobNode));
- }
-
- if (resultNode.HasKey("cypress_job_count") && !resultNode["cypress_job_count"].IsNull()) {
- result.CypressJobCount = resultNode["cypress_job_count"].AsInt64();
- }
- if (resultNode.HasKey("controller_agent_job_count") && !resultNode["controller_agent_job_count"].IsNull()) {
- result.ControllerAgentJobCount = resultNode["scheduler_job_count"].AsInt64();
- }
- if (resultNode.HasKey("archive_job_count") && !resultNode["archive_job_count"].IsNull()) {
- result.ArchiveJobCount = resultNode["archive_job_count"].AsInt64();
- }
-
- return result;
-}
-
-class TResponseReader
- : public IFileReader
-{
-public:
- TResponseReader(const TClientContext& context, THttpHeader header)
- {
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- auto hostName = GetProxyForHeavyRequest(context);
- auto requestId = CreateGuidAsString();
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- ResponseStream_ = Response_->GetResponseStream();
- }
-
-private:
- size_t DoRead(void* buf, size_t len) override
- {
- return ResponseStream_->Read(buf, len);
- }
-
- size_t DoSkip(size_t len) override
- {
- return ResponseStream_->Skip(len);
- }
-
-private:
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* ResponseStream_;
-};
-
-IFileReaderPtr GetJobInput(
- const TClientContext& context,
- const TJobId& jobId,
- const TGetJobInputOptions& /* options */)
-{
- THttpHeader header("GET", "get_job_input");
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
-}
-
-IFileReaderPtr GetJobFailContext(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobFailContextOptions& /* options */)
-{
- THttpHeader header("GET", "get_job_fail_context");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
-}
-
-TString GetJobStderrWithRetries(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /* options */)
-{
- THttpHeader header("GET", "get_job_stderr");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- TRequestConfig config;
- config.IsHeavy = true;
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header, {}, config);
- return responseInfo.Response;
-}
-
-IFileReaderPtr GetJobStderr(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /* options */)
-{
- THttpHeader header("GET", "get_job_stderr");
- header.AddOperationId(operationId);
- header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(context, std::move(header));
-}
-
-TJobTraceEvent ParseJobTraceEvent(const TNode& node)
-{
- const auto& mapNode = node.AsMap();
- TJobTraceEvent result;
-
- if (auto idNode = mapNode.FindPtr("operation_id")) {
- result.OperationId = GetGuid(idNode->AsString());
- }
- if (auto idNode = mapNode.FindPtr("job_id")) {
- result.JobId = GetGuid(idNode->AsString());
- }
- if (auto idNode = mapNode.FindPtr("trace_id")) {
- result.TraceId = GetGuid(idNode->AsString());
- }
- if (auto eventIndexNode = mapNode.FindPtr("event_index")) {
- result.EventIndex = eventIndexNode->AsInt64();
- }
- if (auto eventNode = mapNode.FindPtr("event")) {
- result.Event = eventNode->AsString();
- }
- if (auto eventTimeNode = mapNode.FindPtr("event_time")) {
- result.EventTime = TInstant::ParseIso8601(eventTimeNode->AsString());;
- }
-
- return result;
-}
-
-std::vector<TJobTraceEvent> GetJobTrace(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetJobTraceOptions& options)
-{
- THttpHeader header("GET", "get_job_trace");
- header.MergeParameters(SerializeParamsForGetJobTrace(operationId, options));
- auto responseInfo = RetryRequestWithPolicy(retryPolicy, context, header);
- auto resultNode = NodeFromYsonString(responseInfo.Response);
-
- std::vector<TJobTraceEvent> result;
-
- const auto& traceEventNodesList = resultNode.AsList();
- result.reserve(traceEventNodesList.size());
- for (const auto& traceEventNode : traceEventNodesList) {
- result.push_back(ParseJobTraceEvent(traceEventNode));
- }
-
- return result;
-}
-
TMaybe<TYPath> GetFileFromCache(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 28af3ee9896..494f145fae4 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -23,6 +23,8 @@ namespace NDetail::NRawClient {
TOperationAttributes ParseOperationAttributes(const TNode& node);
+TJobAttributes ParseJobAttributes(const TNode& node);
+
TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////
@@ -36,53 +38,6 @@ void ExecuteBatch(
const TExecuteBatchOptions& options = TExecuteBatchOptions());
//
-// Jobs
-//
-
-TJobAttributes GetJob(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobOptions& options = TGetJobOptions());
-
-TListJobsResult ListJobs(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TListJobsOptions& options = TListJobsOptions());
-
-::TIntrusivePtr<IFileReader> GetJobInput(
- const TClientContext& context,
- const TJobId& jobId,
- const TGetJobInputOptions& options = TGetJobInputOptions());
-
-::TIntrusivePtr<IFileReader> GetJobFailContext(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobFailContextOptions& options = TGetJobFailContextOptions());
-
-TString GetJobStderrWithRetries(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& /* options */ = TGetJobStderrOptions());
-
-::TIntrusivePtr<IFileReader> GetJobStderr(
- const TClientContext& context,
- const TOperationId& operationId,
- const TJobId& jobId,
- const TGetJobStderrOptions& options = TGetJobStderrOptions());
-
-std::vector<TJobTraceEvent> GetJobTrace(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TOperationId& operationId,
- const TGetJobTraceOptions& options = TGetJobTraceOptions());
-
-//
// File cache
//