summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2025-01-14 17:10:48 +0300
committerhiddenpath <[email protected]>2025-01-14 17:27:54 +0300
commit6be1dcf71e42ab7141984480de1c8f255329a923 (patch)
treea52a2a345e598032e38fec00e7ee86079f062645
parente6ef3b21f38c311f6b0d6adb666a5d5b2190b32d (diff)
Make ExecuteBatch as a method of TRawBatchRequest
commit_hash:348e61a3cea27801ce90771a3f05cdee821b245f
-rw-r--r--yt/cpp/mapreduce/client/batch_request_impl.cpp2
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp4
-rw-r--r--yt/cpp/mapreduce/client/prepare_operation.cpp5
-rw-r--r--yt/cpp/mapreduce/client/yt_poller.cpp2
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.cpp60
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_batch_request.h7
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp60
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h9
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp7
9 files changed, 68 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp
index d8084e9c457..eeab541001d 100644
--- a/yt/cpp/mapreduce/client/batch_request_impl.cpp
+++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp
@@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission(
void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options)
{
- NYT::NDetail::ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), *Impl_, options);
+ Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index 70bd5f8b651..8065ca93b30 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -221,7 +221,7 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
ELockMode::LM_SNAPSHOT,
TLockOptions().Waitable(true)));
}
- ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
+ lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
nodeIdFutures.reserve(paths->size());
@@ -232,7 +232,7 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
TGetOptions()));
}
- ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
+ getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext());
for (size_t i = 0; i != paths->size(); ++i) {
auto& richPath = (*paths)[i];
diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp
index 4441e6ae12d..54ed095cc42 100644
--- a/yt/cpp/mapreduce/client/prepare_operation.cpp
+++ b/yt/cpp/mapreduce/client/prepare_operation.cpp
@@ -87,10 +87,7 @@ const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() con
schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
}
- NRawClient::ExecuteBatch(
- RetryPolicy_->CreatePolicyForGenericRequest(),
- Context_,
- batch);
+ batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_);
for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
if (schemaFutures[tableIndex].Initialized()) {
diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp
index 5750f5840c8..b67d07beb87 100644
--- a/yt/cpp/mapreduce/client/yt_poller.cpp
+++ b/yt/cpp/mapreduce/client/yt_poller.cpp
@@ -99,7 +99,7 @@ void TYtPoller::WatchLoop()
}
try {
- ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, rawBatchRequest);
+ rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_);
} catch (const std::exception& ex) {
YT_LOG_ERROR("Exception while executing batch request: %v", ex.what());
}
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
index 1bd9532e4ca..12623cf57eb 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp
@@ -18,6 +18,8 @@
#include <yt/cpp/mapreduce/http/retry_request.h>
#include <util/generic/guid.h>
+#include <util/generic/scope.h>
+
#include <util/string/builder.h>
#include <exception>
@@ -289,6 +291,56 @@ TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config)
TRawBatchRequest::~TRawBatchRequest() = default;
+void TRawBatchRequest::ExecuteBatch(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ const TExecuteBatchOptions& options)
+{
+ if (IsExecuted()) {
+ ythrow yexception() << "Cannot execute batch request since it is already executed";
+ }
+ Y_DEFER {
+ MarkExecuted();
+ };
+
+ const auto concurrency = options.Concurrency_.GetOrElse(50);
+ const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
+
+ if (!retryPolicy) {
+ retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
+ }
+
+ while (BatchSize()) {
+ auto parameters = TNode::CreateMap();
+ TInstant nextTry;
+ FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
+ if (nextTry) {
+ SleepUntil(nextTry);
+ }
+ parameters["concurrency"] = concurrency;
+ auto body = NodeToYsonString(parameters);
+ THttpHeader header("POST", "execute_batch");
+ header.AddMutationId();
+ TResponseInfo result;
+ try {
+ result = RequestWithRetry<TResponseInfo>(
+ retryPolicy,
+ [&context, &header, &body] (TMutationId& mutationId) {
+ auto response = RequestWithoutRetry(context, mutationId, header, body);
+ return TResponseInfo{
+ .RequestId = response->GetRequestId(),
+ .Response = response->GetResponse(),
+ .HttpCode = response->GetStatusCode(),
+ };
+ });
+ } catch (const std::exception& e) {
+ SetErrorResult(std::current_exception());
+ throw;
+ }
+ ParseResponse(std::move(result), retryPolicy.Get());
+ }
+}
+
bool TRawBatchRequest::IsExecuted() const
{
return Executed_;
@@ -612,22 +664,18 @@ void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant
void TRawBatchRequest::ParseResponse(
const TResponseInfo& requestResult,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now)
{
TNode node = NodeFromYsonString(requestResult.Response);
- return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now);
+ return ParseResponse(node, requestResult.RequestId, retryPolicy, now);
}
void TRawBatchRequest::ParseResponse(
TNode node,
const TString& requestId,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now)
{
- Y_ABORT_UNLESS(retryBatch);
-
EnsureType(node, TNode::List);
auto& responseList = node.AsList();
const auto size = responseList.size();
@@ -655,7 +703,7 @@ void TRawBatchRequest::ParseResponse(
"Batch subrequest (%s) failed, will retry, error: %s",
RequestInfo(BatchItemList_[i].Parameters),
error.what());
- retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
+ AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval));
} else {
YT_LOG_ERROR(
"Batch subrequest (%s) failed, error: %s",
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
index 7ed5bebf5e0..e5126c64382 100644
--- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h
+++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h
@@ -41,6 +41,11 @@ public:
TRawBatchRequest(const TConfigPtr& config);
~TRawBatchRequest();
+ void ExecuteBatch(
+ IRequestRetryPolicyPtr retryPolicy,
+ const TClientContext& context,
+ const TExecuteBatchOptions& options = {});
+
bool IsExecuted() const;
void MarkExecuted();
@@ -51,13 +56,11 @@ public:
void ParseResponse(
const TResponseInfo& requestResult,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now = TInstant::Now());
void ParseResponse(
TNode response,
const TString& requestId,
const IRequestRetryPolicyPtr& retryPolicy,
- TRawBatchRequest* retryBatch,
TInstant now = TInstant::Now());
void SetErrorResult(std::exception_ptr e) const;
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 601fe887b48..462cf84b411 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -30,64 +30,6 @@ namespace NYT::NDetail::NRawClient {
////////////////////////////////////////////////////////////////////////////////
-void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- TRawBatchRequest& batchRequest,
- const TExecuteBatchOptions& options)
-{
- if (batchRequest.IsExecuted()) {
- ythrow yexception() << "Cannot execute batch request since it is already executed";
- }
- Y_DEFER {
- batchRequest.MarkExecuted();
- };
-
- const auto concurrency = options.Concurrency_.GetOrElse(50);
- const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5);
-
- if (!retryPolicy) {
- retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
- }
-
- while (batchRequest.BatchSize()) {
- TRawBatchRequest retryBatch(context.Config);
-
- while (batchRequest.BatchSize()) {
- auto parameters = TNode::CreateMap();
- TInstant nextTry;
- batchRequest.FillParameterList(batchPartMaxSize, &parameters["requests"], &nextTry);
- if (nextTry) {
- SleepUntil(nextTry);
- }
- parameters["concurrency"] = concurrency;
- auto body = NodeToYsonString(parameters);
- THttpHeader header("POST", "execute_batch");
- header.AddMutationId();
- TResponseInfo result;
- try {
- result = RequestWithRetry<TResponseInfo>(
- retryPolicy,
- [&context, &header, &body] (TMutationId& mutationId) {
- auto response = RequestWithoutRetry(context, mutationId, header, body);
- return TResponseInfo{
- .RequestId = response->GetRequestId(),
- .Response = response->GetResponse(),
- .HttpCode = response->GetStatusCode(),
- };
- });
- } catch (const std::exception& e) {
- batchRequest.SetErrorResult(std::current_exception());
- retryBatch.SetErrorResult(std::current_exception());
- throw;
- }
- batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch);
- }
-
- batchRequest = std::move(retryBatch);
- }
-}
-
TOperationAttributes ParseOperationAttributes(const TNode& node)
{
const auto& mapNode = node.AsMap();
@@ -329,7 +271,7 @@ TVector<TRichYPath> CanonizeYPaths(
for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
futures.push_back(batch.CanonizeYPath(paths[i]));
}
- ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{});
+ batch.ExecuteBatch(retryPolicy, context);
TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index f46ae0e3b96..360ea0aba10 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -32,13 +32,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////
-// marks `batchRequest' as executed
-void ExecuteBatch(
- IRequestRetryPolicyPtr retryPolicy,
- const TClientContext& context,
- TRawBatchRequest& batchRequest,
- const TExecuteBatchOptions& options = {});
-
TRichYPath CanonizeYPath(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
@@ -72,7 +65,7 @@ auto BatchTransform(
for (const auto& el : src) {
futures.push_back(batchAdder(batch, el));
}
- ExecuteBatch(retryPolicy, context, batch, executeBatchOptions);
+ batch.ExecuteBatch(retryPolicy, context, executeBatchOptions);
using TDst = decltype(futures[0].ExtractValueSync());
TVector<TDst> result;
result.reserve(std::size(src));
diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
index cee62cf1f79..7c9b1b96708 100644
--- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
+++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
@@ -97,7 +97,6 @@ TEST(TBatchRequestImplTest, ParseResponse) {
auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>();
const TInstant now = TInstant::Seconds(100500);
- TRawBatchRequest retryBatch(context.Config);
batchRequest.ParseResponse(
TNode()
.Add(TNode()("output", 5))
@@ -107,15 +106,13 @@ TEST(TBatchRequestImplTest, ParseResponse) {
TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))),
"<no-request-id>",
testRetryPolicy,
- &retryBatch,
now);
- EXPECT_EQ(batchRequest.BatchSize(), 0u);
- EXPECT_EQ(retryBatch.BatchSize(), 2u);
+ EXPECT_EQ(batchRequest.BatchSize(), 2u);
TNode retryParameterList;
TInstant nextTry;
- retryBatch.FillParameterList(3, &retryParameterList, &nextTry);
+ batchRequest.FillParameterList(3, &retryParameterList, &nextTry);
EXPECT_EQ(
GetAllPathsFromRequestList(retryParameterList),
TVector<TString>({"//getError-3", "//getError-5"}));