diff options
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 005bb507fa5..5bb67e3059f 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -319,6 +319,7 @@ TSimpleOperationIo CreateSimpleOperationIo( TOperationPreparationContext( inputs, outputs, + preparer.GetClient()->GetRawClient(), preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), @@ -491,6 +492,7 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( TOperationPreparationContext( structuredInputs, structuredOutputs, + preparer.GetClient()->GetRawClient(), preparer.GetContext(), preparer.GetClientRetryPolicy(), preparer.GetTransactionId()), @@ -499,7 +501,12 @@ TSimpleOperationIo CreateSimpleOperationIoHelper( hints); TVector<TSmallJobFile> formatConfigList; - TFormatBuilder formatBuilder(preparer.GetClientRetryPolicy(), preparer.GetContext(), preparer.GetTransactionId(), options); + TFormatBuilder formatBuilder( + preparer.GetClient()->GetRawClient(), + preparer.GetClientRetryPolicy(), + preparer.GetContext(), + preparer.GetTransactionId(), + options); auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat( structuredJob, @@ -587,11 +594,12 @@ EOperationBriefState CheckOperation( void WaitForOperation( const IClientRetryPolicyPtr& clientRetryPolicy, + const IRawClientPtr& rawClient, const TClientContext& context, const TOperationId& operationId) { const TDuration checkOperationStateInterval = - UseLocalModeOptimization(context, clientRetryPolicy) + UseLocalModeOptimization(rawClient, context, clientRetryPolicy) ? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod) : context.Config->OperationTrackerPollPeriod; @@ -1020,13 +1028,16 @@ void CheckInputTablesExist( Y_ENSURE(!paths.empty(), "Input tables are not set"); for (auto& path : paths) { auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId()); + auto exists = RequestWithRetry<bool>( + preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), + [&preparer, &curTransactionId, &path] (TMutationId& mutationId) { + return preparer.GetClient()->GetRawClient()->Exists( + mutationId, + curTransactionId, + path.Path_); + }); Y_ENSURE_EX( - path.Cluster_.Defined() || - Exists( - preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), - preparer.GetContext(), - curTransactionId, - path.Path_), + path.Cluster_.Defined() || exists, TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist"); } } @@ -1633,6 +1644,7 @@ void ExecuteMapReduce( VerifyHasElements(structuredInputs, "inputs"); TFormatBuilder formatBuilder( + preparer->GetClient()->GetRawClient(), preparer->GetClientRetryPolicy(), preparer->GetContext(), preparer->GetTransactionId(), @@ -1657,6 +1669,7 @@ void ExecuteMapReduce( TOperationPreparationContext( structuredInputs, mapperOutput, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), @@ -1726,6 +1739,7 @@ void ExecuteMapReduce( TOperationPreparationContext( inputs, outputs, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), @@ -1791,6 +1805,7 @@ void ExecuteMapReduce( TOperationPreparationContext( structuredInputs, structuredOutputs, + preparer->GetClient()->GetRawClient(), preparer->GetContext(), preparer->GetClientRetryPolicy(), preparer->GetTransactionId()), |
