summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-13 15:22:36 +0300
committerhiddenpath <[email protected]>2024-12-13 17:04:18 +0300
commit09c88b035d29fac5fd49de2fbc3c71e2d2a80754 (patch)
treea84b5b2de4dcdf85c3b22b9cac7e984aebb8b68d /yt/cpp/mapreduce/client/operation.cpp
parent615edba542d9394b0eae47ef957ec2257549cfdd (diff)
yt/cpp/mapreduce: move Get, TryGet, Exists, MultisetAttributes to THttpRawClient
commit_hash:bd2228f98fa92de408ca850f9bc1608fdf99e7f5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp31
1 files changed, 23 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 005bb507fa5..5bb67e3059f 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -319,6 +319,7 @@ TSimpleOperationIo CreateSimpleOperationIo(
TOperationPreparationContext(
inputs,
outputs,
+ preparer.GetClient()->GetRawClient(),
preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
@@ -491,6 +492,7 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
TOperationPreparationContext(
structuredInputs,
structuredOutputs,
+ preparer.GetClient()->GetRawClient(),
preparer.GetContext(),
preparer.GetClientRetryPolicy(),
preparer.GetTransactionId()),
@@ -499,7 +501,12 @@ TSimpleOperationIo CreateSimpleOperationIoHelper(
hints);
TVector<TSmallJobFile> formatConfigList;
- TFormatBuilder formatBuilder(preparer.GetClientRetryPolicy(), preparer.GetContext(), preparer.GetTransactionId(), options);
+ TFormatBuilder formatBuilder(
+ preparer.GetClient()->GetRawClient(),
+ preparer.GetClientRetryPolicy(),
+ preparer.GetContext(),
+ preparer.GetTransactionId(),
+ options);
auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
structuredJob,
@@ -587,11 +594,12 @@ EOperationBriefState CheckOperation(
void WaitForOperation(
const IClientRetryPolicyPtr& clientRetryPolicy,
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const TOperationId& operationId)
{
const TDuration checkOperationStateInterval =
- UseLocalModeOptimization(context, clientRetryPolicy)
+ UseLocalModeOptimization(rawClient, context, clientRetryPolicy)
? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod)
: context.Config->OperationTrackerPollPeriod;
@@ -1020,13 +1028,16 @@ void CheckInputTablesExist(
Y_ENSURE(!paths.empty(), "Input tables are not set");
for (auto& path : paths) {
auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId());
+ auto exists = RequestWithRetry<bool>(
+ preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
+ [&preparer, &curTransactionId, &path] (TMutationId& mutationId) {
+ return preparer.GetClient()->GetRawClient()->Exists(
+ mutationId,
+ curTransactionId,
+ path.Path_);
+ });
Y_ENSURE_EX(
- path.Cluster_.Defined() ||
- Exists(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- preparer.GetContext(),
- curTransactionId,
- path.Path_),
+ path.Cluster_.Defined() || exists,
TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist");
}
}
@@ -1633,6 +1644,7 @@ void ExecuteMapReduce(
VerifyHasElements(structuredInputs, "inputs");
TFormatBuilder formatBuilder(
+ preparer->GetClient()->GetRawClient(),
preparer->GetClientRetryPolicy(),
preparer->GetContext(),
preparer->GetTransactionId(),
@@ -1657,6 +1669,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
structuredInputs,
mapperOutput,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
@@ -1726,6 +1739,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
inputs,
outputs,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),
@@ -1791,6 +1805,7 @@ void ExecuteMapReduce(
TOperationPreparationContext(
structuredInputs,
structuredOutputs,
+ preparer->GetClient()->GetRawClient(),
preparer->GetContext(),
preparer->GetClientRetryPolicy(),
preparer->GetTransactionId()),