diff options
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_requests.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 60 |
1 files changed, 1 insertions, 59 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 601fe887b48..462cf84b411 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -30,64 +30,6 @@ namespace NYT::NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// -void ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - TRawBatchRequest& batchRequest, - const TExecuteBatchOptions& options) -{ - if (batchRequest.IsExecuted()) { - ythrow yexception() << "Cannot execute batch request since it is already executed"; - } - Y_DEFER { - batchRequest.MarkExecuted(); - }; - - const auto concurrency = options.Concurrency_.GetOrElse(50); - const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); - - if (!retryPolicy) { - retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); - } - - while (batchRequest.BatchSize()) { - TRawBatchRequest retryBatch(context.Config); - - while (batchRequest.BatchSize()) { - auto parameters = TNode::CreateMap(); - TInstant nextTry; - batchRequest.FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); - if (nextTry) { - SleepUntil(nextTry); - } - parameters["concurrency"] = concurrency; - auto body = NodeToYsonString(parameters); - THttpHeader header("POST", "execute_batch"); - header.AddMutationId(); - TResponseInfo result; - try { - result = RequestWithRetry<TResponseInfo>( - retryPolicy, - [&context, &header, &body] (TMutationId& mutationId) { - auto response = RequestWithoutRetry(context, mutationId, header, body); - return TResponseInfo{ - .RequestId = response->GetRequestId(), - .Response = response->GetResponse(), - .HttpCode = response->GetStatusCode(), - }; - }); - } catch (const std::exception& e) { - batchRequest.SetErrorResult(std::current_exception()); - retryBatch.SetErrorResult(std::current_exception()); - throw; - } - batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch); - } - - batchRequest = std::move(retryBatch); - } -} - TOperationAttributes ParseOperationAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); @@ -329,7 +271,7 @@ TVector<TRichYPath> CanonizeYPaths( for (int i = 0; i < static_cast<int>(paths.size()); ++i) { futures.push_back(batch.CanonizeYPath(paths[i])); } - ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{}); + batch.ExecuteBatch(retryPolicy, context); TVector<TRichYPath> result; result.reserve(futures.size()); for (auto& future : futures) { |
