diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 23:25:11 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-15 23:42:58 +0300 |
commit | 183b86950af6daa68f57eb29b6b8ef252d5b3a15 (patch) | |
tree | a69cd2da925c2c55c2df636c8cbe459bd4159536 | |
parent | abccd55428243410fdc9ab1c84039549a6d3ae06 (diff) | |
download | ydb-183b86950af6daa68f57eb29b6b8ef252d5b3a15.tar.gz |
YT-23616: Make BatchTransform implementation being common
commit_hash:4191c9aa7cde449475eddf88d8c04e1ebf0b8ad9
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.cpp | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.cpp | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 8 |
5 files changed, 16 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index 5caf302358..bfb56b4a73 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -225,11 +225,10 @@ TStructuredJobTableList ApplyProtobufColumnFilters( } auto isDynamic = NRawClient::BatchTransform( - CreateDefaultRequestRetryPolicy(preparer.GetContext().Config), - preparer.GetContext(), + preparer.GetClient()->GetRawClient(), tableList, - [&] (IRawBatchRequest& batch, const auto& table) { - return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); + [&] (IRawBatchRequestPtr batch, const auto& table) { + return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions()); }); auto newTableList = tableList; @@ -642,7 +641,7 @@ TNode BuildAutoMergeSpec(const TAutoMergeSpec& options) return result; } -TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec) +[[maybe_unused]] TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec) { TNode result; if (profilerSpec.ProfilingBinary_) { diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp index 50ef7793ba..b89bde7854 100644 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ b/yt/cpp/mapreduce/client/skiff.cpp @@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) { } NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, @@ -304,10 +305,9 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( } auto nodes = NRawClient::BatchTransform( - clientRetryPolicy->CreatePolicyForGenericRequest(), - context, + rawClient, NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), - [&] (IRawBatchRequest& batch, const TRichYPath& path) { + [&] (IRawBatchRequestPtr batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( TAttributeFilter() @@ -315,7 +315,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( .AddAttribute("dynamic") .AddAttribute("type") ); - return batch.Get(transactionId, path.Path_, getOptions); + return batch->Get(transactionId, path.Path_, getOptions); }); TVector<NSkiff::TSkiffSchemaPtr> schemas; diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h index 82d80a4967..24e85a61b5 100644 --- a/yt/cpp/mapreduce/client/skiff.h +++ b/yt/cpp/mapreduce/client/skiff.h @@ -59,6 +59,7 @@ void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node); TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema); NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp index 72b76e6b80..3d95f93027 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.cpp +++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp @@ -121,6 +121,7 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( + const IRawClientPtr& rawClient, const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, @@ -135,6 +136,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( return nullptr; } return CreateSkiffSchemaIfNecessary( + rawClient, context, clientRetryPolicy, transactionId, @@ -434,6 +436,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat( tableList.emplace_back(*table.RichYPath); } skiffSchema = TryCreateSkiffSchema( + RawClient_, Context_, ClientRetryPolicy_, TransactionId_, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 9b37293b12..101985735c 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -9,6 +9,7 @@ #include <yt/cpp/mapreduce/interface/client.h> #include <yt/cpp/mapreduce/interface/client_method_options.h> #include <yt/cpp/mapreduce/interface/operation.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> namespace NYT { @@ -53,19 +54,18 @@ TAuthorizationInfo WhoAmI(const TClientContext& context); template<typename TSrc, typename TBatchAdder> auto BatchTransform( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TSrc& src, TBatchAdder batchAdder, const TExecuteBatchOptions& executeBatchOptions = {}) { - THttpRawBatchRequest batch(context, retryPolicy); + auto batch = rawClient->CreateRawBatchRequest(); using TFuture = decltype(batchAdder(batch, *std::begin(src))); TVector<TFuture> futures; for (const auto& el : src) { futures.push_back(batchAdder(batch, el)); } - batch.ExecuteBatch(executeBatchOptions); + batch->ExecuteBatch(executeBatchOptions); using TDst = decltype(futures[0].ExtractValueSync()); TVector<TDst> result; result.reserve(std::size(src)); |