diff options
| author | hiddenpath <[email protected]> | 2025-01-14 17:10:48 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2025-01-14 17:27:54 +0300 |
| commit | 6be1dcf71e42ab7141984480de1c8f255329a923 (patch) | |
| tree | a52a2a345e598032e38fec00e7ee86079f062645 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | |
| parent | e6ef3b21f38c311f6b0d6adb666a5d5b2190b32d (diff) | |
Make ExecuteBatch as a method of TRawBatchRequest
commit_hash:348e61a3cea27801ce90771a3f05cdee821b245f
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 60 |
1 files changed, 54 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp index 1bd9532e4ca..12623cf57eb 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -18,6 +18,8 @@ #include <yt/cpp/mapreduce/http/retry_request.h> #include <util/generic/guid.h> +#include <util/generic/scope.h> + #include <util/string/builder.h> #include <exception> @@ -289,6 +291,56 @@ TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) TRawBatchRequest::~TRawBatchRequest() = default; +void TRawBatchRequest::ExecuteBatch( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + const TExecuteBatchOptions& options) +{ + if (IsExecuted()) { + ythrow yexception() << "Cannot execute batch request since it is already executed"; + } + Y_DEFER { + MarkExecuted(); + }; + + const auto concurrency = options.Concurrency_.GetOrElse(50); + const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); + + if (!retryPolicy) { + retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + } + + while (BatchSize()) { + auto parameters = TNode::CreateMap(); + TInstant nextTry; + FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); + if (nextTry) { + SleepUntil(nextTry); + } + parameters["concurrency"] = concurrency; + auto body = NodeToYsonString(parameters); + THttpHeader header("POST", "execute_batch"); + header.AddMutationId(); + TResponseInfo result; + try { + result = RequestWithRetry<TResponseInfo>( + retryPolicy, + [&context, &header, &body] (TMutationId& mutationId) { + auto response = RequestWithoutRetry(context, mutationId, header, body); + return TResponseInfo{ + .RequestId = response->GetRequestId(), + .Response = response->GetResponse(), + .HttpCode = response->GetStatusCode(), + }; + }); + } catch (const std::exception& e) { + SetErrorResult(std::current_exception()); + throw; + } + ParseResponse(std::move(result), retryPolicy.Get()); + } +} + bool TRawBatchRequest::IsExecuted() const { return Executed_; @@ -612,22 +664,18 @@ void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant void TRawBatchRequest::ParseResponse( const TResponseInfo& requestResult, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now) { TNode node = NodeFromYsonString(requestResult.Response); - return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); + return ParseResponse(node, requestResult.RequestId, retryPolicy, now); } void TRawBatchRequest::ParseResponse( TNode node, const TString& requestId, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now) { - Y_ABORT_UNLESS(retryBatch); - EnsureType(node, TNode::List); auto& responseList = node.AsList(); const auto size = responseList.size(); @@ -655,7 +703,7 @@ void TRawBatchRequest::ParseResponse( "Batch subrequest (%s) failed, will retry, error: %s", RequestInfo(BatchItemList_[i].Parameters), error.what()); - retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); + AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); } else { YT_LOG_ERROR( "Batch subrequest (%s) failed, error: %s", |
