summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client/raw_requests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_requests.cpp')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp60
1 files changed, 1 insertions, 59 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 601fe887b48..462cf84b411 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -30,64 +30,6 @@ namespace NYT::NDetail::NRawClient {
////////////////////////////////////////////////////////////////////////////////
-void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- TRawBatchRequest& batchRequest,
- const TExecuteBatchOptions& options)
-{
- if (batchRequest.IsExecuted()) {
- ythrow yexception() << "Cannot execute batch request since it is already executed";
- }
- Y_DEFER {
- batchRequest.MarkExecuted();
- };
-
- const auto concurrency = options.Concurrency_.GetOrElse(50);
- const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
-
- if (!retryPolicy) {
- retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
- }
-
- while (batchRequest.BatchSize()) {
- TRawBatchRequest retryBatch(context.Config);
-
- while (batchRequest.BatchSize()) {
- auto parameters = TNode::CreateMap();
- TInstant nextTry;
- batchRequest.FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
- if (nextTry) {
- SleepUntil(nextTry);
- }
- parameters["concurrency"] = concurrency;
- auto body = NodeToYsonString(parameters);
- THttpHeader header("POST", "execute_batch");
- header.AddMutationId();
- TResponseInfo result;
- try {
- result = RequestWithRetry<TResponseInfo>(
- retryPolicy,
- [&context, &header, &body] (TMutationId& mutationId) {
- auto response = RequestWithoutRetry(context, mutationId, header, body);
- return TResponseInfo{
- .RequestId = response->GetRequestId(),
- .Response = response->GetResponse(),
- .HttpCode = response->GetStatusCode(),
- };
- });
- } catch (const std::exception& e) {
- batchRequest.SetErrorResult(std::current_exception());
- retryBatch.SetErrorResult(std::current_exception());
- throw;
- }
- batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch);
- }
-
- batchRequest = std::move(retryBatch);
- }
-}
-
TOperationAttributes ParseOperationAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
@@ -329,7 +271,7 @@ TVector<TRichYPath> CanonizeYPaths(
for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
futures.push_back(batch.CanonizeYPath(paths[i]));
}
- ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{});
+ batch.ExecuteBatch(retryPolicy, context);
TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {