aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-15 23:25:11 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-15 23:42:58 +0300
commit183b86950af6daa68f57eb29b6b8ef252d5b3a15 (patch)
treea69cd2da925c2c55c2df636c8cbe459bd4159536
parentabccd55428243410fdc9ab1c84039549a6d3ae06 (diff)
downloadydb-183b86950af6daa68f57eb29b6b8ef252d5b3a15.tar.gz
YT-23616: Make BatchTransform implementation being common
commit_hash:4191c9aa7cde449475eddf88d8c04e1ebf0b8ad9
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp9
-rw-r--r--yt/cpp/mapreduce/client/skiff.cpp8
-rw-r--r--yt/cpp/mapreduce/client/skiff.h1
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.cpp3
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h8
5 files changed, 16 insertions, 13 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index 5caf302358..bfb56b4a73 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -225,11 +225,10 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
}
auto isDynamic = NRawClient::BatchTransform(
- CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
- preparer.GetContext(),
+ preparer.GetClient()->GetRawClient(),
tableList,
- [&] (IRawBatchRequest& batch, const auto& table) {
- return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
+ [&] (IRawBatchRequestPtr batch, const auto& table) {
+ return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
});
auto newTableList = tableList;
@@ -642,7 +641,7 @@ TNode BuildAutoMergeSpec(const TAutoMergeSpec& options)
return result;
}
-TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
+[[maybe_unused]] TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
{
TNode result;
if (profilerSpec.ProfilingBinary_) {
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp
index 50ef7793ba..b89bde7854 100644
--- a/yt/cpp/mapreduce/client/skiff.cpp
+++ b/yt/cpp/mapreduce/client/skiff.cpp
@@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
}
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
@@ -304,10 +305,9 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
}
auto nodes = NRawClient::BatchTransform(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
+ rawClient,
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
- [&] (IRawBatchRequest& batch, const TRichYPath& path) {
+ [&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
TAttributeFilter()
@@ -315,7 +315,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
.AddAttribute("dynamic")
.AddAttribute("type")
);
- return batch.Get(transactionId, path.Path_, getOptions);
+ return batch->Get(transactionId, path.Path_, getOptions);
});
TVector<NSkiff::TSkiffSchemaPtr> schemas;
diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h
index 82d80a4967..24e85a61b5 100644
--- a/yt/cpp/mapreduce/client/skiff.h
+++ b/yt/cpp/mapreduce/client/skiff.h
@@ -59,6 +59,7 @@ void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node);
TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp
index 72b76e6b80..3d95f93027 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.cpp
+++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp
@@ -121,6 +121,7 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
+ const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
@@ -135,6 +136,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
return nullptr;
}
return CreateSkiffSchemaIfNecessary(
+ rawClient,
context,
clientRetryPolicy,
transactionId,
@@ -434,6 +436,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
tableList.emplace_back(*table.RichYPath);
}
skiffSchema = TryCreateSkiffSchema(
+ RawClient_,
Context_,
ClientRetryPolicy_,
TransactionId_,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 9b37293b12..101985735c 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -9,6 +9,7 @@
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/client_method_options.h>
#include <yt/cpp/mapreduce/interface/operation.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
namespace NYT {
@@ -53,19 +54,18 @@ TAuthorizationInfo WhoAmI(const TClientContext& context);
template<typename TSrc, typename TBatchAdder>
auto BatchTransform(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TSrc& src,
TBatchAdder batchAdder,
const TExecuteBatchOptions& executeBatchOptions = {})
{
- THttpRawBatchRequest batch(context, retryPolicy);
+ auto batch = rawClient->CreateRawBatchRequest();
using TFuture = decltype(batchAdder(batch, *std::begin(src)));
TVector<TFuture> futures;
for (const auto& el : src) {
futures.push_back(batchAdder(batch, el));
}
- batch.ExecuteBatch(executeBatchOptions);
+ batch->ExecuteBatch(executeBatchOptions);
using TDst = decltype(futures[0].ExtractValueSync());
TVector<TDst> result;
result.reserve(std::size(src));