diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 20:17:09 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 21:21:14 +0300 |
commit | bb37d56338bace93813adc51a61936a19acc8edc (patch) | |
tree | 3d771660421001c994e2baf9a40b409d06c7a24b /yt/cpp/mapreduce/raw_client | |
parent | 9a04c0af35c6a47061b3699089da07acfcf14587 (diff) | |
download | ydb-bb37d56338bace93813adc51a61936a19acc8edc.tar.gz |
YT-23616: Make THttpRawBatchRequest as an implementation of IRawBatchRequest interface
commit_hash:9e6c556686dda1562697762d38da532dc5c87b80
Diffstat (limited to 'yt/cpp/mapreduce/raw_client')
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.cpp | 52 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_batch_request.h | 88 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp | 2 |
7 files changed, 93 insertions, 65 deletions
diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp index 1a340b8c17..08fee25b79 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.cpp @@ -285,16 +285,14 @@ THttpRawBatchRequest::TBatchItem::TBatchItem(const TBatchItem& batchItem, TInsta //////////////////////////////////////////////////////////////////////////////// -THttpRawBatchRequest::THttpRawBatchRequest(const TConfigPtr& config) - : Config_(config) +THttpRawBatchRequest::THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy) + : Context_(context) + , RequestRetryPolicy_(std::move(retryPolicy)) { } THttpRawBatchRequest::~THttpRawBatchRequest() = default; -void THttpRawBatchRequest::ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - const TExecuteBatchOptions& options) +void THttpRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& options) { if (IsExecuted()) { ythrow yexception() << "Cannot execute batch request since it is already executed"; @@ -306,8 +304,8 @@ void THttpRawBatchRequest::ExecuteBatch( const auto concurrency = options.Concurrency_.GetOrElse(50); const auto batchPartMaxSize = options.BatchPartMaxSize_.GetOrElse(concurrency * 5); - if (!retryPolicy) { - retryPolicy = CreateDefaultRequestRetryPolicy(context.Config); + if (!RequestRetryPolicy_) { + RequestRetryPolicy_ = CreateDefaultRequestRetryPolicy(Context_.Config); } while (BatchSize()) { @@ -324,9 +322,9 @@ void THttpRawBatchRequest::ExecuteBatch( TResponseInfo result; try { result = RequestWithRetry<TResponseInfo>( - retryPolicy, - [&context, &header, &body] (TMutationId& mutationId) { - auto response = RequestWithoutRetry(context, mutationId, header, body); + RequestRetryPolicy_, + [this, &header, &body] (TMutationId& mutationId) { + auto response = RequestWithoutRetry(Context_, mutationId, header, body); return TResponseInfo{ .RequestId = response->GetRequestId(), .Response = response->GetResponse(), @@ -337,7 +335,7 @@ void THttpRawBatchRequest::ExecuteBatch( SetErrorResult(std::current_exception()); throw; } - ParseResponse(std::move(result), retryPolicy.Get()); + ParseResponse(std::move(result), RequestRetryPolicy_.Get()); } } @@ -392,7 +390,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Create( { return AddRequest<TGuidResponseParser>( "create", - SerializeParamsForCreate(transaction, Config_->Prefix, path, type, options), + SerializeParamsForCreate(transaction, Context_.Config->Prefix, path, type, options), Nothing()); } @@ -403,7 +401,7 @@ TFuture<void> THttpRawBatchRequest::Remove( { return AddRequest<TVoidResponseParser>( "remove", - SerializeParamsForRemove(transaction, Config_->Prefix, path, options), + SerializeParamsForRemove(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -414,7 +412,7 @@ TFuture<bool> THttpRawBatchRequest::Exists( { return AddRequest<TExistsResponseParser>( "exists", - SerializeParamsForExists(transaction, Config_->Prefix, path, options), + SerializeParamsForExists(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -425,7 +423,7 @@ TFuture<TNode> THttpRawBatchRequest::Get( { return AddRequest<TGetResponseParser>( "get", - SerializeParamsForGet(transaction, Config_->Prefix, path, options), + SerializeParamsForGet(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -437,7 +435,7 @@ TFuture<void> THttpRawBatchRequest::Set( { return AddRequest<TVoidResponseParser>( "set", - SerializeParamsForSet(transaction, Config_->Prefix, path, options), + SerializeParamsForSet(transaction, Context_.Config->Prefix, path, options), node); } @@ -448,7 +446,7 @@ TFuture<TNode::TListType> THttpRawBatchRequest::List( { return AddRequest<TListResponseParser>( "list", - SerializeParamsForList(transaction, Config_->Prefix, path, options), + SerializeParamsForList(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -460,7 +458,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Copy( { return AddRequest<TGuidResponseParser>( "copy", - SerializeParamsForCopy(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForCopy(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -472,7 +470,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Move( { return AddRequest<TGuidResponseParser>( "move", - SerializeParamsForMove(transaction, Config_->Prefix, sourcePath, destinationPath, options), + SerializeParamsForMove(transaction, Context_.Config->Prefix, sourcePath, destinationPath, options), Nothing()); } @@ -484,7 +482,7 @@ TFuture<TNodeId> THttpRawBatchRequest::Link( { return AddRequest<TGuidResponseParser>( "link", - SerializeParamsForLink(transaction, Config_->Prefix, targetPath, linkPath, options), + SerializeParamsForLink(transaction, Context_.Config->Prefix, targetPath, linkPath, options), Nothing()); } @@ -496,7 +494,7 @@ TFuture<TLockId> THttpRawBatchRequest::Lock( { return AddRequest<TGuidResponseParser>( "lock", - SerializeParamsForLock(transaction, Config_->Prefix, path, mode, options), + SerializeParamsForLock(transaction, Context_.Config->Prefix, path, mode, options), Nothing()); } @@ -507,7 +505,7 @@ TFuture<void> THttpRawBatchRequest::Unlock( { return AddRequest<TVoidResponseParser>( "unlock", - SerializeParamsForUnlock(transaction, Config_->Prefix, path, options), + SerializeParamsForUnlock(transaction, Context_.Config->Prefix, path, options), Nothing()); } @@ -532,7 +530,7 @@ TFuture<TYPath> THttpRawBatchRequest::PutFileToCache( { return AddRequest<TYPathParser>( "put_file_to_cache", - SerializeParamsForPutFileToCache(transactionId, Config_->Prefix, filePath, md5Signature, cachePath, options), + SerializeParamsForPutFileToCache(transactionId, Context_.Config->Prefix, filePath, md5Signature, cachePath, options), Nothing()); } @@ -544,7 +542,7 @@ TFuture<TCheckPermissionResponse> THttpRawBatchRequest::CheckPermission( { return AddRequest<TCheckPermissionParser>( "check_permission", - SerializeParamsForCheckPermission(user, permission, Config_->Prefix, path, options), + SerializeParamsForCheckPermission(user, permission, Context_.Config->Prefix, path, options), Nothing()); } @@ -607,7 +605,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) TRichYPath result = path; // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath. if (!result.Path_.StartsWith("<")) { - result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); + result.Path_ = AddPathPrefix(result.Path_, Context_.Config->Prefix); } if (result.Path_.find_first_of("<>{}[]") != TString::npos) { @@ -615,7 +613,7 @@ TFuture<TRichYPath> THttpRawBatchRequest::CanonizeYPath(const TRichYPath& path) "parse_ypath", SerializeParamsForParseYPath(result), Nothing(), - MakeIntrusive<TCanonizeYPathResponseParser>(Config_->Prefix, result)); + MakeIntrusive<TCanonizeYPathResponseParser>(Context_.Config->Prefix, result)); } return NThreading::MakeFuture(result); } diff --git a/yt/cpp/mapreduce/raw_client/raw_batch_request.h b/yt/cpp/mapreduce/raw_client/raw_batch_request.h index 670b97a843..2af0e31305 100644 --- a/yt/cpp/mapreduce/raw_client/raw_batch_request.h +++ b/yt/cpp/mapreduce/raw_client/raw_batch_request.h @@ -2,13 +2,14 @@ #include <yt/cpp/mapreduce/common/fwd.h> -#include <yt/cpp/mapreduce/interface/batch_request.h> +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/requests.h> + #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/raw_batch_request.h> #include <yt/cpp/mapreduce/interface/retry_policy.h> -#include <yt/cpp/mapreduce/http/requests.h> - #include <library/cpp/threading/future/future.h> #include <util/generic/ptr.h> @@ -25,7 +26,7 @@ namespace NYT::NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// class THttpRawBatchRequest - : public TThrRefBase + : public IRawBatchRequest { public: struct IResponseItemParser @@ -38,13 +39,10 @@ public: }; public: - THttpRawBatchRequest(const TConfigPtr& config); + THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy); ~THttpRawBatchRequest(); - void ExecuteBatch( - IRequestRetryPolicyPtr retryPolicy, - const TClientContext& context, - const TExecuteBatchOptions& options = {}); + void ExecuteBatch(const TExecuteBatchOptions& options = {}) override; bool IsExecuted() const; void MarkExecuted(); @@ -68,91 +66,113 @@ public: const TTransactionId& transaction, const TYPath& path, ENodeType type, - const TCreateOptions& options); + const TCreateOptions& options = {}) override; + ::NThreading::TFuture<void> Remove( const TTransactionId& transaction, const TYPath& path, - const TRemoveOptions& options); + const TRemoveOptions& options = {}) override; + ::NThreading::TFuture<bool> Exists( const TTransactionId& transaction, const TYPath& path, - const TExistsOptions& options); + const TExistsOptions& options = {}) override; + ::NThreading::TFuture<TNode> Get( const TTransactionId& transaction, const TYPath& path, - const TGetOptions& options); + const TGetOptions& options = {}) override; + ::NThreading::TFuture<void> Set( const TTransactionId& transaction, const TYPath& path, const TNode& value, - const TSetOptions& options); + const TSetOptions& options = {}) override; + ::NThreading::TFuture<TNode::TListType> List( const TTransactionId& transaction, const TYPath& path, - const TListOptions& options); + const TListOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Copy( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, - const TCopyOptions& options); + const TCopyOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Move( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, - const TMoveOptions& options); + const TMoveOptions& options = {}) override; + ::NThreading::TFuture<TNodeId> Link( const TTransactionId& transaction, const TYPath& targetPath, const TYPath& linkPath, - const TLinkOptions& options); + const TLinkOptions& options = {}) override; + ::NThreading::TFuture<TLockId> Lock( const TTransactionId& transaction, const TYPath& path, ELockMode mode, - const TLockOptions& options); + const TLockOptions& options = {}) override; + ::NThreading::TFuture<void> Unlock( const TTransactionId& transaction, const TYPath& path, - const TUnlockOptions& options); + const TUnlockOptions& options = {}) override; + ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, - const TGetFileFromCacheOptions& options); + const TGetFileFromCacheOptions& options = {}) override; + ::NThreading::TFuture<TYPath> PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, - const TPutFileToCacheOptions& options); + const TPutFileToCacheOptions& options = {}) override; + ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( const TString& user, EPermission permission, const TYPath& path, - const TCheckPermissionOptions& options); + const TCheckPermissionOptions& options = {}) override; + ::NThreading::TFuture<TOperationAttributes> GetOperation( const TOperationId& operationId, - const TGetOperationOptions& options); - ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId); - ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId); + const TGetOperationOptions& options = {}) override; + + ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) override; + + ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) override; + ::NThreading::TFuture<void> SuspendOperation( const TOperationId& operationId, - const TSuspendOperationOptions& options); + const TSuspendOperationOptions& options = {}) override; + ::NThreading::TFuture<void> ResumeOperation( const TOperationId& operationId, - const TResumeOperationOptions& options); + const TResumeOperationOptions& options = {}) override; + ::NThreading::TFuture<void> UpdateOperationParameters( const TOperationId& operationId, - const TUpdateOperationParametersOptions& options); - ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path); + const TUpdateOperationParametersOptions& options = {}) override; + + ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) override; + ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( const TTransactionId& transaction, const TVector<TRichYPath>& paths, - const TGetTableColumnarStatisticsOptions& options); + const TGetTableColumnarStatisticsOptions& options = {}) override; + ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( const TTransactionId& transaction, const TVector<TRichYPath>& paths, - const TGetTablePartitionsOptions& options); + const TGetTablePartitionsOptions& options = {}) override; private: struct TBatchItem { @@ -182,7 +202,9 @@ private: void AddRequest(TBatchItem batchItem); private: - TConfigPtr Config_; + const TClientContext Context_; + + IRequestRetryPolicyPtr RequestRetryPolicy_; TDeque<TBatchItem> BatchItemList_; bool Executed_ = false; diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 53d2be114a..f25fe5a4ee 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -11,6 +11,7 @@ #include <yt/cpp/mapreduce/http/retry_request.h> #include <yt/cpp/mapreduce/interface/fluent.h> +#include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/tvm.h> @@ -925,6 +926,11 @@ ui64 THttpRawClient::GenerateTimestamp() return NodeFromYsonString(responseInfo->GetResponse()).AsUint64(); } +IRawBatchRequestPtr THttpRawClient::CreateRawBatchRequest() +{ + return MakeIntrusive<NRawClient::THttpRawBatchRequest>(Context_, /*retryPolicy*/ nullptr); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index e18e32a92e..f0e8378db3 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -335,6 +335,8 @@ public: ui64 GenerateTimestamp() override; + IRawBatchRequestPtr CreateRawBatchRequest() override; + private: const TClientContext Context_; }; diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index fd5c59ec20..6ba8a3f084 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -265,13 +265,13 @@ TVector<TRichYPath> CanonizeYPaths( const TClientContext& context, const TVector<TRichYPath>& paths) { - THttpRawBatchRequest batch(context.Config); + THttpRawBatchRequest batch(context, retryPolicy); TVector<NThreading::TFuture<TRichYPath>> futures; futures.reserve(paths.size()); for (int i = 0; i < static_cast<int>(paths.size()); ++i) { futures.push_back(batch.CanonizeYPath(paths[i])); } - batch.ExecuteBatch(retryPolicy, context); + batch.ExecuteBatch(); TVector<TRichYPath> result; result.reserve(futures.size()); for (auto& future : futures) { diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 860d4ae939..9b37293b12 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -59,13 +59,13 @@ auto BatchTransform( TBatchAdder batchAdder, const TExecuteBatchOptions& executeBatchOptions = {}) { - THttpRawBatchRequest batch(context.Config); + THttpRawBatchRequest batch(context, retryPolicy); using TFuture = decltype(batchAdder(batch, *std::begin(src))); TVector<TFuture> futures; for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } - batch.ExecuteBatch(retryPolicy, context, executeBatchOptions); + batch.ExecuteBatch(executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector<TDst> result; result.reserve(std::size(src)); diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp index 36a6935e82..1871ee80fb 100644 --- a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp +++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp @@ -73,7 +73,7 @@ TVector<TString> GetAllPathsFromRequestList(const TNode& requestList) TEST(TBatchRequestImplTest, ParseResponse) { TClientContext context; - THttpRawBatchRequest batchRequest(context.Config); + THttpRawBatchRequest batchRequest(context, /*retryPolicy*/ nullptr); EXPECT_EQ(batchRequest.BatchSize(), 0u); |