diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 20:17:09 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 21:21:14 +0300 |
commit | bb37d56338bace93813adc51a61936a19acc8edc (patch) | |
tree | 3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | |
parent | 9a04c0af35c6a47061b3699089da07acfcf14587 (diff) | |
download | ydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz |
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp/mapreduce/raw_client/raw_batch_request.cpp')
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 52 |
1 files changed, 25 insertions, 27 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp index 1a340b8c17..08fee25b79 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -285,16 +285,14 @@ THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInsta //////////////////////////////////////////////////////////////////////////////// -THttpRawBatchRequest::THttpRawBatchRequest(const TConfigPtr& config) - : Config_(config) +THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy) + : Context_(context) + , RequestRetryPolicy_(std::move(retryPolicy)) { } THttpRawBatchRequest::~THttpRawBatchRequest() = default; -void THttpRawBatchRequest::ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - const TExecuteBatchOptions& options) +void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { if (IsExecuted()) { ythrow yexception() << "Cannot execute batch request since it is already executed"; @@ -306,8 +304,8 @@ void THttpRawBatchRequest::ExecuteBatch( const auto concurrency = options.Concurrency_.GetOrElse(50); const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); - if (!retryPolicy) { - retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + if (!RequestRetryPolicy_) { + RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config); } while (BatchSize()) { @@ -324,9 +322,9 @@ void THttpRawBatchRequest::ExecuteBatch( TResponseInfo result; try { result = RequestWithRetry<TResponseInfo>( - retryPolicy, - [&context, &header, &body] (TMutationId& mutationId) { - auto response = RequestWithoutRetry(context, mutationId, header, body); + RequestRetryPolicy_, + [this, &header, &body] (TMutationId& mutationId) { + auto response = RequestWithoutRetry(Context_, mutationId, header, body); return TResponseInfo{ .RequestId = response->GetRequestId(), .Response = response->GetResponse(), @@ -337,7 +335,7 @@ void THttpRawBatchRequest::ExecuteBatch( SetErrorResult(std::current_exception()); throw; } - ParseResponse(std::move(result), retryPolicy.Get()); + ParseResponse(std::move(result), RequestRetryPolicy_.Get()); } } @@ -392,7 +390,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Create( { return AddRequest<TGuidResponseParser>( "create", - SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), + SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options), Nothing()); } @@ -403,7 +401,7 @@ TFuture<void> THttpRawBatchRequest::Remove( { return AddRequest<TVoidResponseParser>( "remove", - SerializeParamsForRemove(transaction, Config_->Prefix, path, options), + SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -414,7 +412,7 @@ TFuture<bool> THttpRawBatchRequest::Exists( { return AddRequest<TExistsResponseParser>( "exists", - SerializeParamsForExists(transaction, Config_->Prefix, path, options), + SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -425,7 +423,7 @@ TFuture<TNode> THttpRawBatchRequest::Get( { return AddRequest<TGetResponseParser>( "get", - SerializeParamsForGet(transaction, Config_->Prefix, path, options), + SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -437,7 +435,7 @@ TFuture<void> THttpRawBatchRequest::Set( { return AddRequest<TVoidResponseParser>( "set", - SerializeParamsForSet(transaction, Config_->Prefix, path, options), + SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options), node); } @@ -448,7 +446,7 @@ TFuture<TNode::TListType> THttpRawBatchRequest::List( { return AddRequest<TListResponseParser>( "list", - SerializeParamsForList(transaction, Config_->Prefix, path, options), + SerializeParamsForList(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -460,7 +458,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Copy( { return AddRequest<TGuidResponseParser>( "copy", - SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -472,7 +470,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Move( { return AddRequest<TGuidResponseParser>( "move", - SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -484,7 +482,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Link( { return AddRequest<TGuidResponseParser>( "link", - SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), + SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options), Nothing()); } @@ -496,7 +494,7 @@ TFuture<TLockId> THttpRawBatchRequest::Lock( { return AddRequest<TGuidResponseParser>( "lock", - SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), + SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options), Nothing()); } @@ -507,7 +505,7 @@ TFuture<void> THttpRawBatchRequest::Unlock( { return AddRequest<TVoidResponseParser>( "unlock", - SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), + SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -532,7 +530,7 @@ TFuture<TYPath> THttpRawBatchRequest::PutFileToCache( { return AddRequest<TYPathParser>( "put_file_to_cache", - SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), + SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options), Nothing()); } @@ -544,7 +542,7 @@ TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission( { return AddRequest<TCheckPermissionParser>( "check_permission", - SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), + SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options), Nothing()); } @@ -607,7 +605,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) TRichYPath result = path; // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath. if (!result.Path_.StartsWith("<")) { - result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); + result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix); } if (result.Path_.find_first_of("<>{}[]") != TString::npos) { @@ -615,7 +613,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) "parse_ypath", SerializeParamsForParseYPath(result), Nothing(), - MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result)); + MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result)); } return NThreading::MakeFuture(result); } |