aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2025-01-16 00:49:07 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2025-01-16 01:07:01 +0300
commit8dff5c10763551a5b2e82294518d7d95d68bec23 (patch)
tree2cebe415c5a7b7c04b979964c48ec4d36c613d01
parent0395c5087038d4d95d8f64c67e458add6ac040fd (diff)
downloadydb-8dff5c10763551a5b2e82294518d7d95d68bec23.tar.gz
YT-23616: Make implementation of CanonizeYPath method being common
commit_hash:7f3ecc44b4299acc4fc7b0f463eceac61d0b0156
-rw-r--r--yt/cpp/mapreduce/client/client.cpp2
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp34
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp2
-rw-r--r--yt/cpp/mapreduce/client/py_helpers.cpp2
-rw-r--r--yt/cpp/mapreduce/client/skiff.cpp4
-rw-r--r--yt/cpp/mapreduce/client/skiff.h2
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.cpp10
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.h2
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp19
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h6
10 files changed, 36 insertions, 47 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 9443ea51fc..8e3eed7bdc 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -276,7 +276,7 @@ void TClientBase::Concatenate(
TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
{
- return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
+ return NRawClient::CanonizeYPath(RawClient_, path);
}
TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index bfb56b4a73..ef7015a6f1 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -280,8 +280,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
structuredJob,
preparer,
options,
- CanonizeStructuredTableList(preparer.GetContext(), GetStructuredInputs(spec)),
- CanonizeStructuredTableList(preparer.GetContext(), GetStructuredOutputs(spec)),
+ CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredInputs(spec)),
+ CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredOutputs(spec)),
hints,
nodeReaderFormat,
GetColumnsUsedInOperation(spec));
@@ -303,8 +303,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
}
};
- auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
- auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
+ auto inputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetInputs());
+ auto outputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetOutputs());
VerifyHasElements(inputs, "input");
VerifyHasElements(outputs, "output");
@@ -1632,9 +1632,9 @@ void ExecuteMapReduce(
TMapReduceOperationSpec spec = spec_;
TMapReduceOperationIo operationIo;
- auto structuredInputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredInputs());
- auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredMapOutputs());
- auto structuredOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredOutputs());
+ auto structuredInputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredInputs());
+ auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredMapOutputs());
+ auto structuredOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredOutputs());
const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer->GetContext().Config->InferTableSchema);
@@ -1898,9 +1898,9 @@ void ExecuteRawMapReduce(
YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
preparer->GetPreparationId());
TMapReduceOperationIo operationIo;
- operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
- operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
- operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
+ operationIo.Inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetInputs());
+ operationIo.MapOutputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetMapOutputs());
+ operationIo.Outputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetOutputs());
VerifyHasElements(operationIo.Inputs, "inputs");
VerifyHasElements(operationIo.Outputs, "outputs");
@@ -1947,8 +1947,8 @@ void ExecuteSort(
{
YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -1996,8 +1996,8 @@ void ExecuteMerge(
{
YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
@@ -2046,7 +2046,7 @@ void ExecuteErase(
{
YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
+ auto tablePath = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
.BeginMap()
@@ -2082,8 +2082,8 @@ void ExecuteRemoteCopy(
{
YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
+ auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
+ auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
if (options.CreateOutputTables_) {
CreateOutputTable(*preparer, output);
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index 1e5f81b5d9..c1907a7037 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -394,7 +394,7 @@ TJobPreparer::TJobPreparer(
{
CreateStorage();
- auto cypressFileList = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
+ auto cypressFileList = NRawClient::CanonizeYPaths(RawClient_, spec.Files_);
for (const auto& file : cypressFileList) {
UseFileInCypress(file);
diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp
index 3072449866..79c372daa9 100644
--- a/yt/cpp/mapreduce/client/py_helpers.cpp
+++ b/yt/cpp/mapreduce/client/py_helpers.cpp
@@ -51,7 +51,7 @@ TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOpe
paths.emplace_back(inputNode.AsString());
}
}
- paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
+ paths = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), paths);
TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
for (const auto& path : paths) {
result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp
index b89bde7854..113dee0743 100644
--- a/yt/cpp/mapreduce/client/skiff.cpp
+++ b/yt/cpp/mapreduce/client/skiff.cpp
@@ -280,8 +280,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
- const TClientContext& context,
- const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
@@ -306,7 +304,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
auto nodes = NRawClient::BatchTransform(
rawClient,
- NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
+ NRawClient::CanonizeYPaths(rawClient, tablePaths),
[&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h
index 24e85a61b5..5f26dc7656 100644
--- a/yt/cpp/mapreduce/client/skiff.h
+++ b/yt/cpp/mapreduce/client/skiff.h
@@ -60,8 +60,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
- const TClientContext& context,
- const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp
index 3d95f93027..9da31dcf14 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.cpp
+++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp
@@ -122,8 +122,6 @@ namespace NDetail {
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
const IRawClientPtr& rawClient,
- const TClientContext& context,
- const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
const TVector<TRichYPath>& tables,
const TOperationOptions& options,
@@ -137,8 +135,6 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
}
return CreateSkiffSchemaIfNecessary(
rawClient,
- context,
- clientRetryPolicy,
transactionId,
nodeReaderFormat,
tables,
@@ -214,14 +210,14 @@ TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTableP
return result;
}
-TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList)
+TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList)
{
TVector<TRichYPath> toCanonize;
toCanonize.reserve(tableList.size());
for (const auto& table : tableList) {
toCanonize.emplace_back(table.RichYPath);
}
- const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize);
+ const auto canonized = NRawClient::CanonizeYPaths(rawClient, toCanonize);
Y_ABORT_UNLESS(canonized.size() == tableList.size());
TStructuredJobTableList result;
@@ -437,8 +433,6 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
}
skiffSchema = TryCreateSkiffSchema(
RawClient_,
- Context_,
- ClientRetryPolicy_,
TransactionId_,
tableList,
OperationOptions_,
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h
index 162630d411..64a44c6f4d 100644
--- a/yt/cpp/mapreduce/client/structured_table_formats.h
+++ b/yt/cpp/mapreduce/client/structured_table_formats.h
@@ -69,7 +69,7 @@ using TStructuredJobTableList = TVector<TStructuredJobTable>;
TString JobTablePathString(const TStructuredJobTable& jobTable);
TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList);
-TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList);
+TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList);
TVector<TRichYPath> GetPathList(
const TStructuredJobTableList& tableList,
const TMaybe<TVector<TTableSchema>>& schemaInferenceResult,
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 6ba8a3f084..30b1619b71 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -253,25 +253,26 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
}
TRichYPath CanonizeYPath(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TRichYPath& path)
{
- return CanonizeYPaths(retryPolicy, context, {path}).front();
+ return CanonizeYPaths(rawClient, {path}).front();
}
TVector<TRichYPath> CanonizeYPaths(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TVector<TRichYPath>& paths)
{
- THttpRawBatchRequest batch(context, retryPolicy);
+ auto batch = rawClient->CreateRawBatchRequest();
+
TVector<NThreading::TFuture<TRichYPath>> futures;
futures.reserve(paths.size());
- for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
- futures.push_back(batch.CanonizeYPath(paths[i]));
+ for (const auto& path : paths) {
+ futures.push_back(batch->CanonizeYPath(path));
}
- batch.ExecuteBatch();
+
+ batch->ExecuteBatch();
+
TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 101985735c..1487624171 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -34,13 +34,11 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////
TRichYPath CanonizeYPath(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TRichYPath& path);
TVector<TRichYPath> CanonizeYPaths(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
+ const IRawClientPtr& rawClient,
const TVector<TRichYPath>& paths);
NHttpClient::IHttpResponsePtr SkyShareTable(