#pragma once #include "raw_batch_request.h" #include #include #include #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////////////////// class IRequestRetryPolicy; struct TClientContext; struct TExecuteBatchOptions; //////////////////////////////////////////////////////////////////////////////// namespace NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// TOperationAttributes ParseOperationAttributes(const TNode& node); TJobAttributes ParseJobAttributes(const TNode& node); TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// TRichYPath CanonizeYPath( const IRawClientPtr& rawClient, const TRichYPath& path); TVector CanonizeYPaths( const IRawClientPtr& rawClient, const TVector& paths); NHttpClient::IHttpResponsePtr SkyShareTable( const TClientContext& context, const std::vector& tablePaths, const TSkyShareTableOptions& options); std::unique_ptr WriteTable( const TClientContext& context, const TTransactionId& transactionId, const TRichYPath& path, const TMaybe& format, const TTableWriterOptions& options); TAuthorizationInfo WhoAmI(const TClientContext& context); //////////////////////////////////////////////////////////////////////////////// template auto BatchTransform( const IRawClientPtr& rawClient, const TSrc& src, TBatchAdder batchAdder, const TExecuteBatchOptions& executeBatchOptions = {}) { auto batch = rawClient->CreateRawBatchRequest(); using TFuture = decltype(batchAdder(batch, *std::begin(src))); TVector futures; for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } batch->ExecuteBatch(executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector result; result.reserve(std::size(src)); for (auto& future : futures) { result.push_back(future.ExtractValueSync()); } return result; } template auto RemoteClustersBatchTransform( const IRawClientPtr& rawClient, const TClientContext& context, const TVector& paths, TBatchAdder batchAdder, const TExecuteBatchOptions& options = {}) { // Given inputs from multiple clusters, we need to categorize them based on their cluster names. // We assume the current cluster name is empty. Within each cluster, // gather all related inputs to perform a batch request. std::unordered_map> clusterToPathsIndexes; for (ssize_t index = 0; index < std::ssize(paths); ++index) { const auto& path = paths[index]; auto clusterName = path.Cluster_.GetOrElse(""); clusterToPathsIndexes[clusterName].push_back(index); } std::vector result(paths.size()); for (const auto& [clusterName, pathsIndexes] : clusterToPathsIndexes) { auto newContext = context; if (!clusterName.empty()) { // It is not a current cluster, we switch the cluster context. SetupClusterContext(newContext, clusterName); } auto newRawClient = rawClient->Clone(newContext); TVector pathsBatch; pathsBatch.reserve(pathsIndexes.size()); for (int index : pathsIndexes) { pathsBatch.push_back(paths[index]); } auto batchResult = NRawClient::BatchTransform( newRawClient, NRawClient::CanonizeYPaths(newRawClient, pathsBatch), batchAdder, options); for (ssize_t index = 0; index < std::ssize(pathsIndexes); ++index) { result[pathsIndexes[index]] = batchResult[index]; } } return result; } //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail::NRawClient } // namespace NYT