diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-22 08:47:22 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-22 09:04:11 +0300 |
commit | 044fc00c5520ec73b6146427ce9f1cf80ec6a95f (patch) | |
tree | 6d8b56e510374542ad49e5588c25d95701d7cf02 /yt/cpp/mapreduce/http_client/raw_batch_request.h | |
parent | 8f9ae59afa6108d373d287e973a7597c0a89143e (diff) | |
download | ydb-044fc00c5520ec73b6146427ce9f1cf80ec6a95f.tar.gz |
YT-23616: Rename raw_client to http_client
commit_hash:df330f3a0c0ca36d9bcf801fd96b964f1be6383a
Diffstat (limited to 'yt/cpp/mapreduce/http_client/raw_batch_request.h')
-rw-r--r-- | yt/cpp/mapreduce/http_client/raw_batch_request.h | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/http_client/raw_batch_request.h b/yt/cpp/mapreduce/http_client/raw_batch_request.h new file mode 100644 index 0000000000..2af0e31305 --- /dev/null +++ b/yt/cpp/mapreduce/http_client/raw_batch_request.h @@ -0,0 +1,215 @@ +#pragma once + +#include <yt/cpp/mapreduce/common/fwd.h> + +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/http/requests.h> + +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/node.h> +#include <yt/cpp/mapreduce/interface/raw_batch_request.h> +#include <yt/cpp/mapreduce/interface/retry_policy.h> + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/ptr.h> +#include <util/generic/deque.h> + +#include <exception> + +namespace NYT::NDetail { + struct TResponseInfo; +} + +namespace NYT::NDetail::NRawClient { + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRawBatchRequest + : public IRawBatchRequest +{ +public: + struct IResponseItemParser + : public TThrRefBase + { + ~IResponseItemParser() = default; + + virtual void SetResponse(TMaybe<TNode> node) = 0; + virtual void SetException(std::exception_ptr e) = 0; + }; + +public: + THttpRawBatchRequest(const TClientContext& context, IRequestRetryPolicyPtr retryPolicy); + ~THttpRawBatchRequest(); + + void ExecuteBatch(const TExecuteBatchOptions& options = {}) override; + + bool IsExecuted() const; + void MarkExecuted(); + + void FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const; + + size_t BatchSize() const; + + void ParseResponse( + const TResponseInfo& requestResult, + const IRequestRetryPolicyPtr& retryPolicy, + TInstant now = TInstant::Now()); + void ParseResponse( + TNode response, + const TString& requestId, + const IRequestRetryPolicyPtr& retryPolicy, + TInstant now = TInstant::Now()); + void SetErrorResult(std::exception_ptr e) const; + + ::NThreading::TFuture<TNodeId> Create( + const TTransactionId& transaction, + const TYPath& path, + ENodeType type, + const TCreateOptions& options = {}) override; + + ::NThreading::TFuture<void> Remove( + const TTransactionId& transaction, + const TYPath& path, + const TRemoveOptions& options = {}) override; + + ::NThreading::TFuture<bool> Exists( + const TTransactionId& transaction, + const TYPath& path, + const TExistsOptions& options = {}) override; + + ::NThreading::TFuture<TNode> Get( + const TTransactionId& transaction, + const TYPath& path, + const TGetOptions& options = {}) override; + + ::NThreading::TFuture<void> Set( + const TTransactionId& transaction, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) override; + + ::NThreading::TFuture<TNode::TListType> List( + const TTransactionId& transaction, + const TYPath& path, + const TListOptions& options = {}) override; + + ::NThreading::TFuture<TNodeId> Copy( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TCopyOptions& options = {}) override; + + ::NThreading::TFuture<TNodeId> Move( + const TTransactionId& transaction, + const TYPath& sourcePath, + const TYPath& destinationPath, + const TMoveOptions& options = {}) override; + + ::NThreading::TFuture<TNodeId> Link( + const TTransactionId& transaction, + const TYPath& targetPath, + const TYPath& linkPath, + const TLinkOptions& options = {}) override; + + ::NThreading::TFuture<TLockId> Lock( + const TTransactionId& transaction, + const TYPath& path, + ELockMode mode, + const TLockOptions& options = {}) override; + + ::NThreading::TFuture<void> Unlock( + const TTransactionId& transaction, + const TYPath& path, + const TUnlockOptions& options = {}) override; + + ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( + const TTransactionId& transactionId, + const TString& md5Signature, + const TYPath& cachePath, + const TGetFileFromCacheOptions& options = {}) override; + + ::NThreading::TFuture<TYPath> PutFileToCache( + const TTransactionId& transactionId, + const TYPath& filePath, + const TString& md5Signature, + const TYPath& cachePath, + const TPutFileToCacheOptions& options = {}) override; + + ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( + const TString& user, + EPermission permission, + const TYPath& path, + const TCheckPermissionOptions& options = {}) override; + + ::NThreading::TFuture<TOperationAttributes> GetOperation( + const TOperationId& operationId, + const TGetOperationOptions& options = {}) override; + + ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId) override; + + ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId) override; + + ::NThreading::TFuture<void> SuspendOperation( + const TOperationId& operationId, + const TSuspendOperationOptions& options = {}) override; + + ::NThreading::TFuture<void> ResumeOperation( + const TOperationId& operationId, + const TResumeOperationOptions& options = {}) override; + + ::NThreading::TFuture<void> UpdateOperationParameters( + const TOperationId& operationId, + const TUpdateOperationParametersOptions& options = {}) override; + + ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path) override; + + ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTableColumnarStatisticsOptions& options = {}) override; + + ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( + const TTransactionId& transaction, + const TVector<TRichYPath>& paths, + const TGetTablePartitionsOptions& options = {}) override; + +private: + struct TBatchItem { + TNode Parameters; + ::TIntrusivePtr<IResponseItemParser> ResponseParser; + TInstant NextTry; + + TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser); + + TBatchItem(const TBatchItem& batchItem, TInstant nextTry); + }; + +private: + template <typename TResponseParser> + typename TResponseParser::TFutureResult AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input); + + template <typename TResponseParser> + typename TResponseParser::TFutureResult AddRequest( + const TString& command, + TNode parameters, + TMaybe<TNode> input, + ::TIntrusivePtr<TResponseParser> parser); + + void AddRequest(TBatchItem batchItem); + +private: + const TClientContext Context_; + + IRequestRetryPolicyPtr RequestRetryPolicy_; + + TDeque<TBatchItem> BatchItemList_; + bool Executed_ = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail::NRawClient |