#include "raw_batch_request.h" #include "wrap_rpc_error.h" #include #include #include #include #include namespace NYT::NDetail { using ::NThreading::TFuture; using ::NThreading::TPromise; //////////////////////////////////////////////////////////////////////////////// template TRpcRawBatchRequest::TSingleRequest::TSingleRequest( IRequestRetryPolicyPtr retryPolicy, std::function request) : RequestRetryPolicy_(retryPolicy) , Result_(::NThreading::NewPromise()) , Request_(std::move(request)) { } template TFuture TRpcRawBatchRequest::TSingleRequest::GetFuture() { return WrapRpcError(Result_.GetFuture()); } template void TRpcRawBatchRequest::TSingleRequest::Invoke() { try { if constexpr (std::is_same_v) { RequestWithRetry(RequestRetryPolicy_, Request_); Result_.SetValue(); } else { auto value = RequestWithRetry(RequestRetryPolicy_, Request_); Result_.SetValue(std::move(value)); } } catch (...) { Result_.SetException(std::current_exception()); } } //////////////////////////////////////////////////////////////////////////////// TRpcRawBatchRequest::TRpcRawBatchRequest( IRawClientPtr rawClient, const TConfigPtr& config) : RawClient_(std::move(rawClient)) , Config_(config) { } void TRpcRawBatchRequest::ExecuteBatch(const TExecuteBatchOptions& /*options*/) { if (Executed_) { ythrow yexception() << "Cannot execute batch request since it is already executed"; } Y_DEFER { Executed_ = true; }; while (!Requests_.empty()) { auto& request = Requests_.front(); request->Invoke(); Requests_.pop(); } } TFuture TRpcRawBatchRequest::Create( const TTransactionId& transactionId, const TYPath& path, ENodeType type, const TCreateOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { return RawClient_->Create(mutationId, transactionId, path, type, options); }); auto future = request->GetFuture(); Requests_.emplace(std::move(request)); return future; } TFuture TRpcRawBatchRequest::Remove( const TTransactionId& transactionId, const TYPath& path, const TRemoveOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->Remove(mutationId, transactionId, path, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Exists( const TTransactionId& transactionId, const TYPath& path, const TExistsOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->Exists(transactionId, path, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Get( const TTransactionId& transactionId, const TYPath& path, const TGetOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->Get(transactionId, path, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Set( const TTransactionId& transactionId, const TYPath& path, const TNode& value, const TSetOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->Set(mutationId, transactionId, path, value, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::List( const TTransactionId& transactionId, const TYPath& path, const TListOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->List(transactionId, path, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Copy( const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TCopyOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->CopyWithoutRetries(transactionId, sourcePath, destinationPath, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Move( const TTransactionId& transactionId, const TYPath& sourcePath, const TYPath& destinationPath, const TMoveOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->MoveWithoutRetries(transactionId, sourcePath, destinationPath, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Link( const TTransactionId& transactionId, const TYPath& targetPath, const TYPath& linkPath, const TLinkOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { return RawClient_->Link(mutationId, transactionId, targetPath, linkPath, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Lock( const TTransactionId& transactionId, const TYPath& path, ELockMode mode, const TLockOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { return RawClient_->Lock(mutationId, transactionId, path, mode, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::Unlock( const TTransactionId& transactionId, const TYPath& path, const TUnlockOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->Unlock(mutationId, transactionId, path, options); }); return AddRequest(std::move(request)); } TFuture> TRpcRawBatchRequest::GetFileFromCache( const TTransactionId& transactionId, const TString& md5Signature, const TYPath& cachePath, const TGetFileFromCacheOptions& options) { auto request = MakeIntrusive>>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->GetFileFromCache(transactionId, md5Signature, cachePath, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::PutFileToCache( const TTransactionId& transactionId, const TYPath& filePath, const TString& md5Signature, const TYPath& cachePath, const TPutFileToCacheOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->PutFileToCache(transactionId, filePath, md5Signature, cachePath, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::CheckPermission( const TString& user, EPermission permission, const TYPath& path, const TCheckPermissionOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->CheckPermission(user, permission, path, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->GetOperation(operationId, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::AbortOperation(const TOperationId& operationId) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->AbortOperation(mutationId, operationId); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::CompleteOperation(const TOperationId& operationId) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->CompleteOperation(mutationId, operationId); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::SuspendOperation( const TOperationId& operationId, const TSuspendOperationOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->SuspendOperation(mutationId, operationId, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::ResumeOperation( const TOperationId& operationId, const TResumeOperationOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& mutationId) { RawClient_->ResumeOperation(mutationId, operationId, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::UpdateOperationParameters( const TOperationId& operationId, const TUpdateOperationParametersOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { RawClient_->UpdateOperationParameters(operationId, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::CanonizeYPath(const TRichYPath& path) { TRichYPath result = path; // Out of the symbols in the canonization branch below, only '<' can appear in the beggining of a valid rich YPath. if (!result.Path_.StartsWith("<")) { result.Path_ = AddPathPrefix(result.Path_, Config_->Prefix); } if (result.Path_.find_first_of("<>{}[]:") != TString::npos) { auto request = MakeIntrusive>( MakeIntrusive(/*attemptLimit*/ 1u, Config_), [=, this] (TMutationId& /*mutationId*/) { auto richPath = NYPath::TRichYPath::Parse(result.Path_); TNode pathNode; TNodeBuilder builder(&pathNode); Serialize(richPath, &builder); auto originalPathNode = PathToNode(result); for (const auto& [key, value] : originalPathNode.GetAttributes().AsMap()) { pathNode.Attributes()[key] = value; } TRichYPath canonizedPath; Deserialize(canonizedPath, pathNode); canonizedPath.Path_ = AddPathPrefix(canonizedPath.Path_, Config_->Prefix); return canonizedPath; }); return AddRequest(std::move(request)); } return ::NThreading::MakeFuture(result); } TFuture> TRpcRawBatchRequest::GetTableColumnarStatistics( const TTransactionId& transactionId, const TVector& paths, const TGetTableColumnarStatisticsOptions& options) { auto request = MakeIntrusive>>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->GetTableColumnarStatistics(transactionId, paths, options); }); return AddRequest(std::move(request)); } TFuture TRpcRawBatchRequest::GetTablePartitions( const TTransactionId& transactionId, const TVector& paths, const TGetTablePartitionsOptions& options) { auto request = MakeIntrusive>( CreateDefaultRequestRetryPolicy(Config_), [=, this] (TMutationId& /*mutationId*/) { return RawClient_->GetTablePartitions(transactionId, paths, options); }); return AddRequest(std::move(request)); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail