#pragma once #include <yt/cpp/mapreduce/common/fwd.h> #include <yt/cpp/mapreduce/interface/batch_request.h> #include <yt/cpp/mapreduce/interface/fwd.h> #include <yt/cpp/mapreduce/interface/node.h> #include <yt/cpp/mapreduce/interface/retry_policy.h> #include <yt/cpp/mapreduce/http/requests.h> #include <library/cpp/threading/future/future.h> #include <util/generic/ptr.h> #include <util/generic/deque.h> #include <exception> namespace NYT::NDetail { struct TResponseInfo; } namespace NYT::NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// class TRawBatchRequest : public TThrRefBase { public: struct IResponseItemParser : public TThrRefBase { ~IResponseItemParser() = default; virtual void SetResponse(TMaybe<TNode> node) = 0; virtual void SetException(std::exception_ptr e) = 0; }; public: TRawBatchRequest(const TConfigPtr& config); ~TRawBatchRequest(); bool IsExecuted() const; void MarkExecuted(); void FillParameterList(size_t maxSize, TNode* result, TInstant* nextTry) const; size_t BatchSize() const; void ParseResponse( const TResponseInfo& requestResult, const IRequestRetryPolicyPtr& retryPolicy, TRawBatchRequest* retryBatch, TInstant now = TInstant::Now()); void ParseResponse( TNode response, const TString& requestId, const IRequestRetryPolicyPtr& retryPolicy, TRawBatchRequest* retryBatch, TInstant now = TInstant::Now()); void SetErrorResult(std::exception_ptr e) const; ::NThreading::TFuture<TNodeId> Create( const TTransactionId& transaction, const TYPath& path, ENodeType type, const TCreateOptions& options); ::NThreading::TFuture<void> Remove( const TTransactionId& transaction, const TYPath& path, const TRemoveOptions& options); ::NThreading::TFuture<bool> Exists( const TTransactionId& transaction, const TYPath& path, const TExistsOptions& options); ::NThreading::TFuture<TNode> Get( const TTransactionId& transaction, const TYPath& path, const TGetOptions& options); ::NThreading::TFuture<void> Set( const TTransactionId& transaction, const TYPath& path, const TNode& value, const TSetOptions& options); ::NThreading::TFuture<TNode::TListType> List( const TTransactionId& transaction, const TYPath& path, const TListOptions& options); ::NThreading::TFuture<TNodeId> Copy( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options); ::NThreading::TFuture<TNodeId> Move( const TTransactionId& transaction, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options); ::NThreading::TFuture<TNodeId> Link( const TTransactionId& transaction, const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options); ::NThreading::TFuture<TLockId> Lock( const TTransactionId& transaction, const TYPath& path, ELockMode mode, const TLockOptions& options); ::NThreading::TFuture<void> Unlock( const TTransactionId& transaction, const TYPath& path, const TUnlockOptions& options); ::NThreading::TFuture<TMaybe<TYPath>> GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options); ::NThreading::TFuture<TYPath> PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options); ::NThreading::TFuture<TCheckPermissionResponse> CheckPermission( const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options); ::NThreading::TFuture<TOperationAttributes> GetOperation( const TOperationId& operationId, const TGetOperationOptions& options); ::NThreading::TFuture<void> AbortOperation(const TOperationId& operationId); ::NThreading::TFuture<void> CompleteOperation(const TOperationId& operationId); ::NThreading::TFuture<void> SuspendOperation( const TOperationId& operationId, const TSuspendOperationOptions& options); ::NThreading::TFuture<void> ResumeOperation( const TOperationId& operationId, const TResumeOperationOptions& options); ::NThreading::TFuture<void> UpdateOperationParameters( const TOperationId& operationId, const TUpdateOperationParametersOptions& options); ::NThreading::TFuture<TRichYPath> CanonizeYPath(const TRichYPath& path); ::NThreading::TFuture<TVector<TTableColumnarStatistics>> GetTableColumnarStatistics( const TTransactionId& transaction, const TVector<TRichYPath>& paths, const TGetTableColumnarStatisticsOptions& options); ::NThreading::TFuture<TMultiTablePartitions> GetTablePartitions( const TTransactionId& transaction, const TVector<TRichYPath>& paths, const TGetTablePartitionsOptions& options); private: struct TBatchItem { TNode Parameters; ::TIntrusivePtr<IResponseItemParser> ResponseParser; TInstant NextTry; TBatchItem(TNode parameters, ::TIntrusivePtr<IResponseItemParser> responseParser); TBatchItem(const TBatchItem& batchItem, TInstant nextTry); }; private: template <typename TResponseParser> typename TResponseParser::TFutureResult AddRequest( const TString& command, TNode parameters, TMaybe<TNode> input); template <typename TResponseParser> typename TResponseParser::TFutureResult AddRequest( const TString& command, TNode parameters, TMaybe<TNode> input, ::TIntrusivePtr<TResponseParser> parser); void AddRequest(TBatchItem batchItem); private: TConfigPtr Config_; TDeque<TBatchItem> BatchItemList_; bool Executed_ = false; }; //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail::NRawClient