#pragma once #include "raw_batch_request.h" #include <yt/cpp/mapreduce/common/fwd.h> #include <yt/cpp/mapreduce/client/client.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/interface/client.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> #include <yt/cpp/mapreduce/interface/operation.h> #include <yt/cpp/mapreduce/interface/raw_client.h> namespace NYT { //////////////////////////////////////////////////////////////////////////////// class IRequestRetryPolicy; struct TClientContext; struct TExecuteBatchOptions; //////////////////////////////////////////////////////////////////////////////// namespace NDetail::NRawClient { //////////////////////////////////////////////////////////////////////////////// TOperationAttributes ParseOperationAttributes(const TNode& node); TJobAttributes ParseJobAttributes(const TNode& node); TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// TRichYPath CanonizeYPath( const IRawClientPtr& rawClient, const TRichYPath& path); TVector<TRichYPath> CanonizeYPaths( const IRawClientPtr& rawClient, const TVector<TRichYPath>& paths); NHttpClient::IHttpResponsePtr SkyShareTable( const TClientContext& context, const std::vector<TYPath>& tablePaths, const TSkyShareTableOptions& options); std::unique_ptr<IOutputStream> WriteTable( const TClientContext& context, const TTransactionId& transactionId, const TRichYPath& path, const TMaybe<TFormat>& format, const TTableWriterOptions& options); void InsertRows( const TClientContext& context, const TYPath& path, const TNode::TListType& rows, const TInsertRowsOptions& options); TNode::TListType LookupRows( const TClientContext& context, const TYPath& path, const TNode::TListType& keys, const TLookupRowsOptions& options); void DeleteRows( const TClientContext& context, const TYPath& path, const TNode::TListType& keys, const TDeleteRowsOptions& options); TAuthorizationInfo WhoAmI(const TClientContext& context); //////////////////////////////////////////////////////////////////////////////// template<typename TSrc, typename TBatchAdder> auto BatchTransform( const IRawClientPtr& rawClient, const TSrc& src, TBatchAdder batchAdder, const TExecuteBatchOptions& executeBatchOptions = {}) { auto batch = rawClient->CreateRawBatchRequest(); using TFuture = decltype(batchAdder(batch, *std::begin(src))); TVector<TFuture> futures; for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } batch->ExecuteBatch(executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector<TDst> result; result.reserve(std::size(src)); for (auto& future : futures) { result.push_back(future.ExtractValueSync()); } return result; } template <typename TBatchAdder> auto RemoteClustersBatchTransform( const IRawClientPtr& rawClient, const TClientContext& context, const TVector<TRichYPath>& paths, TBatchAdder batchAdder, const TExecuteBatchOptions& options = {}) { // Given inputs from multiple clusters, we need to categorize them based on their cluster names. // We assume the current cluster name is empty. Within each cluster, // gather all related inputs to perform a batch request. std::unordered_map<TString, std::vector<int>> clusterToPathsIndexes; for (ssize_t index = 0; index < std::ssize(paths); ++index) { const auto& path = paths[index]; auto clusterName = path.Cluster_.GetOrElse(""); clusterToPathsIndexes[clusterName].push_back(index); } std::vector<TNode> result(paths.size()); for (const auto& [clusterName, pathsIndexes] : clusterToPathsIndexes) { auto newContext = context; if (!clusterName.empty()) { // It is not a current cluster, we switch the cluster context. SetupClusterContext(newContext, clusterName); } auto newRawClient = rawClient->Clone(newContext); TVector<TRichYPath> pathsBatch; pathsBatch.reserve(pathsIndexes.size()); for (int index : pathsIndexes) { pathsBatch.push_back(paths[index]); } auto batchResult = NRawClient::BatchTransform( newRawClient, NRawClient::CanonizeYPaths(newRawClient, pathsBatch), batchAdder, options); for (ssize_t index = 0; index < std::ssize(pathsIndexes); ++index) { result[pathsIndexes[index]] = batchResult[index]; } } return result; } //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail::NRawClient } // namespace NYT