summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2025-01-14 17:10:48 +0300
committerhiddenpath <[email protected]>2025-01-14 17:27:54 +0300
commit6be1dcf71e42ab7141984480de1c8f255329a923 (patch)
treea52a2a345e598032e38fec00e7ee86079f062645 /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
parente6ef3b21f38c311f6b0d6adb666a5d5b2190b32d (diff)
Make ExecuteBatch as a method of TRawBatchRequest
commit_hash:348e61a3cea27801ce90771a3f05cdee821b245f
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp60
1 files changed, 54 insertions, 6 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
index 1bd9532e4ca..12623cf57eb 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -18,6 +18,8 @@
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <util/generic/guid.h>
+#include <util/generic/scope.h>
+
#include <util/string/builder.h>
#include <exception>
@@ -289,6 +291,56 @@ TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
TRawBatchRequest::~TRawBatchRequest() = default;
+void TRawBatchRequest::ExecuteBatch(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ const TExecuteBatchOptions& options)
+{
+ if (IsExecuted()) {
+ ythrow yexception() << "Cannot execute batch request since it is already executed";
+ }
+ Y_DEFER {
+ MarkExecuted();
+ };
+
+ const auto concurrency = options.Concurrency_.GetOrElse(50);
+ const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
+
+ if (!retryPolicy) {
+ retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ }
+
+ while (BatchSize()) {
+ auto parameters = TNode::CreateMap();
+ TInstant nextTry;
+ FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
+ if (nextTry) {
+ SleepUntil(nextTry);
+ }
+ parameters["concurrency"] = concurrency;
+ auto body = NodeToYsonString(parameters);
+ THttpHeader header("POST", "execute_batch");
+ header.AddMutationId();
+ TResponseInfo result;
+ try {
+ result = RequestWithRetry<TResponseInfo>(
+ retryPolicy,
+ [&context, &header, &body] (TMutationId& mutationId) {
+ auto response = RequestWithoutRetry(context, mutationId, header, body);
+ return TResponseInfo{
+ .RequestId = response->GetRequestId(),
+ .Response = response->GetResponse(),
+ .HttpCode = response->GetStatusCode(),
+ };
+ });
+ } catch (const std::exception& e) {
+ SetErrorResult(std::current_exception());
+ throw;
+ }
+ ParseResponse(std::move(result), retryPolicy.Get());
+ }
+}
+
bool TRawBatchRequest::IsExecuted() const
{
return Executed_;
@@ -612,22 +664,18 @@ void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant
void TRawBatchRequest::ParseResponse(
const TResponseInfo& requestResult,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now)
{
TNode node = NodeFromYsonString(requestResult.Response);
- return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
+ return ParseResponse(node, requestResult.RequestId, retryPolicy, now);
}
void TRawBatchRequest::ParseResponse(
TNode node,
const TString& requestId,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now)
{
- Y_ABORT_UNLESS(retryBatch);
-
EnsureType(node, TNode::List);
auto& responseList = node.AsList();
const auto size = responseList.size();
@@ -655,7 +703,7 @@ void TRawBatchRequest::ParseResponse(
"Batch subrequest (%s) failed, will retry, error: %s",
RequestInfo(BatchItemList_[i].Parameters),
error.what());
- retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
+ AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
} else {
YT_LOG_ERROR(
"Batch subrequest (%s) failed, error: %s",