diff options
| -rw-r--r-- | yt/cpp/mapreduce/client/batch_request_impl.cpp | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 4 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/prepare_operation.cpp | 5 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/client/yt_poller.cpp | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 60 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.h | 7 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 60 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 9 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp | 7 |
9 files changed, 68 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/batch_request_impl.cpp b/yt/cpp/mapreduce/client/batch_request_impl.cpp index d8084e9c457..eeab541001d 100644 --- a/yt/cpp/mapreduce/client/batch_request_impl.cpp +++ b/yt/cpp/mapreduce/client/batch_request_impl.cpp @@ -189,7 +189,7 @@ TFuture<TCheckPermissionResponse> TBatchRequest::CheckPermission( void TBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { - NYT::NDetail::ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), *Impl_, options); + Impl_->ExecuteBatch(Client_->GetRetryPolicy()->CreatePolicyForGenericRequest(), Client_->GetContext(), options); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 70bd5f8b651..8065ca93b30 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -221,7 +221,7 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths) ELockMode::LM_SNAPSHOT, TLockOptions().Waitable(true))); } - ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest); + lockRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); TVector<::NThreading::TFuture<TNode>> nodeIdFutures; nodeIdFutures.reserve(paths->size()); @@ -232,7 +232,7 @@ void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths) ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id", TGetOptions())); } - ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest); + getNodeIdRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext()); for (size_t i = 0; i != paths->size(); ++i) { auto& richPath = (*paths)[i]; diff --git a/yt/cpp/mapreduce/client/prepare_operation.cpp b/yt/cpp/mapreduce/client/prepare_operation.cpp index 4441e6ae12d..54ed095cc42 100644 --- a/yt/cpp/mapreduce/client/prepare_operation.cpp +++ b/yt/cpp/mapreduce/client/prepare_operation.cpp @@ -87,10 +87,7 @@ const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() con schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{})); } - NRawClient::ExecuteBatch( - RetryPolicy_->CreatePolicyForGenericRequest(), - Context_, - batch); + batch.ExecuteBatch(RetryPolicy_->CreatePolicyForGenericRequest(), Context_); for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) { if (schemaFutures[tableIndex].Initialized()) { diff --git a/yt/cpp/mapreduce/client/yt_poller.cpp b/yt/cpp/mapreduce/client/yt_poller.cpp index 5750f5840c8..b67d07beb87 100644 --- a/yt/cpp/mapreduce/client/yt_poller.cpp +++ b/yt/cpp/mapreduce/client/yt_poller.cpp @@ -99,7 +99,7 @@ void TYtPoller::WatchLoop() } try { - ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, rawBatchRequest); + rawBatchRequest.ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_); } catch (const std::exception& ex) { YT_LOG_ERROR("Exception while executing batch request: %v", ex.what()); } diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp index 1bd9532e4ca..12623cf57eb 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -18,6 +18,8 @@ #include <yt/cpp/mapreduce/http/retry_request.h> #include <util/generic/guid.h> +#include <util/generic/scope.h> + #include <util/string/builder.h> #include <exception> @@ -289,6 +291,56 @@ TRawBatchRequest::TRawBatchRequest(const TConfigPtr& config) TRawBatchRequest::~TRawBatchRequest() = default; +void TRawBatchRequest::ExecuteBatch( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + const TExecuteBatchOptions& options) +{ + if (IsExecuted()) { + ythrow yexception() << "Cannot execute batch request since it is already executed"; + } + Y_DEFER { + MarkExecuted(); + }; + + const auto concurrency = options.Concurrency_.GetOrElse(50); + const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); + + if (!retryPolicy) { + retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + } + + while (BatchSize()) { + auto parameters = TNode::CreateMap(); + TInstant nextTry; + FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); + if (nextTry) { + SleepUntil(nextTry); + } + parameters["concurrency"] = concurrency; + auto body = NodeToYsonString(parameters); + THttpHeader header("POST", "execute_batch"); + header.AddMutationId(); + TResponseInfo result; + try { + result = RequestWithRetry<TResponseInfo>( + retryPolicy, + [&context, &header, &body] (TMutationId& mutationId) { + auto response = RequestWithoutRetry(context, mutationId, header, body); + return TResponseInfo{ + .RequestId = response->GetRequestId(), + .Response = response->GetResponse(), + .HttpCode = response->GetStatusCode(), + }; + }); + } catch (const std::exception& e) { + SetErrorResult(std::current_exception()); + throw; + } + ParseResponse(std::move(result), retryPolicy.Get()); + } +} + bool TRawBatchRequest::IsExecuted() const { return Executed_; @@ -612,22 +664,18 @@ void TRawBatchRequest::FillParameterList(size_t maxSize, TNode* result, TInstant void TRawBatchRequest::ParseResponse( const TResponseInfo& requestResult, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now) { TNode node = NodeFromYsonString(requestResult.Response); - return ParseResponse(node, requestResult.RequestId, retryPolicy, retryBatch, now); + return ParseResponse(node, requestResult.RequestId, retryPolicy, now); } void TRawBatchRequest::ParseResponse( TNode node, const TString& requestId, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now) { - Y_ABORT_UNLESS(retryBatch); - EnsureType(node, TNode::List); auto& responseList = node.AsList(); const auto size = responseList.size(); @@ -655,7 +703,7 @@ void TRawBatchRequest::ParseResponse( "Batch subrequest (%s) failed, will retry, error: %s", RequestInfo(BatchItemList_[i].Parameters), error.what()); - retryBatch->AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); + AddRequest(TBatchItem(BatchItemList_[i], now + *curInterval)); } else { YT_LOG_ERROR( "Batch subrequest (%s) failed, error: %s", diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h index 7ed5bebf5e0..e5126c64382 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h @@ -41,6 +41,11 @@ public: TRawBatchRequest(const TConfigPtr& config); ~TRawBatchRequest(); + void ExecuteBatch( + IRequestRetryPolicyPtr retryPolicy, + const TClientContext& context, + const TExecuteBatchOptions& options = {}); + bool IsExecuted() const; void MarkExecuted(); @@ -51,13 +56,11 @@ public: void ParseResponse( const TResponseInfo& requestResult, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now = TInstant::Now()); void ParseResponse( TNode response, const TString& requestId, const IRequestRetryPolicyPtr& retryPolicy, - TRawBatchRequest* retryBatch, TInstant now = TInstant::Now()); void SetErrorResult(std::exception_ptr e) const; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 601fe887b48..462cf84b411 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -30,64 +30,6 @@ namespace NYT::NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// -void ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - TRawBatchRequest& batchRequest, - const TExecuteBatchOptions& options) -{ - if (batchRequest.IsExecuted()) { - ythrow yexception() << "Cannot execute batch request since it is already executed"; - } - Y_DEFER { - batchRequest.MarkExecuted(); - }; - - const auto concurrency = options.Concurrency_.GetOrElse(50); - const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); - - if (!retryPolicy) { - retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); - } - - while (batchRequest.BatchSize()) { - TRawBatchRequest retryBatch(context.Config); - - while (batchRequest.BatchSize()) { - auto parameters = TNode::CreateMap(); - TInstant nextTry; - batchRequest.FillParameterList(batchPartMaxSize, ¶meters["requests"], &nextTry); - if (nextTry) { - SleepUntil(nextTry); - } - parameters["concurrency"] = concurrency; - auto body = NodeToYsonString(parameters); - THttpHeader header("POST", "execute_batch"); - header.AddMutationId(); - TResponseInfo result; - try { - result = RequestWithRetry<TResponseInfo>( - retryPolicy, - [&context, &header, &body] (TMutationId& mutationId) { - auto response = RequestWithoutRetry(context, mutationId, header, body); - return TResponseInfo{ - .RequestId = response->GetRequestId(), - .Response = response->GetResponse(), - .HttpCode = response->GetStatusCode(), - }; - }); - } catch (const std::exception& e) { - batchRequest.SetErrorResult(std::current_exception()); - retryBatch.SetErrorResult(std::current_exception()); - throw; - } - batchRequest.ParseResponse(std::move(result), retryPolicy.Get(), &retryBatch); - } - - batchRequest = std::move(retryBatch); - } -} - TOperationAttributes ParseOperationAttributes(const TNode& node) { const auto& mapNode = node.AsMap(); @@ -329,7 +271,7 @@ TVector<TRichYPath> CanonizeYPaths( for (int i = 0; i < static_cast<int>(paths.size()); ++i) { futures.push_back(batch.CanonizeYPath(paths[i])); } - ExecuteBatch(retryPolicy, context, batch, TExecuteBatchOptions{}); + batch.ExecuteBatch(retryPolicy, context); TVector<TRichYPath> result; result.reserve(futures.size()); for (auto& future : futures) { diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index f46ae0e3b96..360ea0aba10 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -32,13 +32,6 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// -// marks `batchRequest' as executed -void ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - TRawBatchRequest& batchRequest, - const TExecuteBatchOptions& options = {}); - TRichYPath CanonizeYPath( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, @@ -72,7 +65,7 @@ auto BatchTransform( for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } - ExecuteBatch(retryPolicy, context, batch, executeBatchOptions); + batch.ExecuteBatch(retryPolicy, context, executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector<TDst> result; result.reserve(std::size(src)); diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp index cee62cf1f79..7c9b1b96708 100644 --- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp +++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp @@ -97,7 +97,6 @@ TEST(TBatchRequestImplTest, ParseResponse) { auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>(); const TInstant now = TInstant::Seconds(100500); - TRawBatchRequest retryBatch(context.Config); batchRequest.ParseResponse( TNode() .Add(TNode()("output", 5)) @@ -107,15 +106,13 @@ TEST(TBatchRequestImplTest, ParseResponse) { TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))), "<no-request-id>", testRetryPolicy, - &retryBatch, now); - EXPECT_EQ(batchRequest.BatchSize(), 0u); - EXPECT_EQ(retryBatch.BatchSize(), 2u); + EXPECT_EQ(batchRequest.BatchSize(), 2u); TNode retryParameterList; TInstant nextTry; - retryBatch.FillParameterList(3, &retryParameterList, &nextTry); + batchRequest.FillParameterList(3, &retryParameterList, &nextTry); EXPECT_EQ( GetAllPathsFromRequestList(retryParameterList), TVector<TString>({"//getError-3", "//getError-5"})); |
