diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-16 00:49:07 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2025-01-16 01:07:01 +0300 |
commit | 8dff5c10763551a5b2e82294518d7d95d68bec23 (patch) | |
tree | 2cebe415c5a7b7c04b979964c48ec4d36c613d01 | |
parent | 0395c5087038d4d95d8f64c67e458add6ac040fd (diff) | |
download | ydb-8dff5c10763551a5b2e82294518d7d95d68bec23.tar.gz |
YT-23616: Make implementation of CanonizeYPath method being common
commit_hash:7f3ecc44b4299acc4fc7b0f463eceac61d0b0156
-rw-r--r-- | yt/cpp/mapreduce/client/client.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 34 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/py_helpers.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.cpp | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/skiff.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.cpp | 10 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.h | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.cpp | 19 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_requests.h | 6 |
10 files changed, 36 insertions, 47 deletions
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 9443ea51fc..8e3eed7bdc 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -276,7 +276,7 @@ void TClientBase::Concatenate( TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path) { - return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path); + return NRawClient::CanonizeYPath(RawClient_, path); } TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics( diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index bfb56b4a73..ef7015a6f1 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -280,8 +280,8 @@ TSimpleOperationIo CreateSimpleOperationIo( structuredJob, preparer, options, - CanonizeStructuredTableList(preparer.GetContext(), GetStructuredInputs(spec)), - CanonizeStructuredTableList(preparer.GetContext(), GetStructuredOutputs(spec)), + CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredInputs(spec)), + CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredOutputs(spec)), hints, nodeReaderFormat, GetColumnsUsedInOperation(spec)); @@ -303,8 +303,8 @@ TSimpleOperationIo CreateSimpleOperationIo( } }; - auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs()); - auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs()); + auto inputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetInputs()); + auto outputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetOutputs()); VerifyHasElements(inputs, "input"); VerifyHasElements(outputs, "output"); @@ -1632,9 +1632,9 @@ void ExecuteMapReduce( TMapReduceOperationSpec spec = spec_; TMapReduceOperationIo operationIo; - auto structuredInputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredInputs()); - auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredMapOutputs()); - auto structuredOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredOutputs()); + auto structuredInputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredInputs()); + auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredMapOutputs()); + auto structuredOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredOutputs()); const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer->GetContext().Config->InferTableSchema); @@ -1898,9 +1898,9 @@ void ExecuteRawMapReduce( YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)", preparer->GetPreparationId()); TMapReduceOperationIo operationIo; - operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs()); - operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs()); - operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs()); + operationIo.Inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetInputs()); + operationIo.MapOutputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetMapOutputs()); + operationIo.Outputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetOutputs()); VerifyHasElements(operationIo.Inputs, "inputs"); VerifyHasElements(operationIo.Outputs, "outputs"); @@ -1947,8 +1947,8 @@ void ExecuteSort( { YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -1996,8 +1996,8 @@ void ExecuteMerge( { YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_); if (options.CreateOutputTables_) { CheckInputTablesExist(*preparer, inputs); @@ -2046,7 +2046,7 @@ void ExecuteErase( { YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)", preparer->GetPreparationId()); - auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); + auto tablePath = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() .BeginMap() @@ -2082,8 +2082,8 @@ void ExecuteRemoteCopy( { YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)", preparer->GetPreparationId()); - auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_); - auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_); + auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_); + auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_); if (options.CreateOutputTables_) { CreateOutputTable(*preparer, output); diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 1e5f81b5d9..c1907a7037 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -394,7 +394,7 @@ TJobPreparer::TJobPreparer( { CreateStorage(); - auto cypressFileList = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_); + auto cypressFileList = NRawClient::CanonizeYPaths(RawClient_, spec.Files_); for (const auto& file : cypressFileList) { UseFileInCypress(file); diff --git a/yt/cpp/mapreduce/client/py_helpers.cpp b/yt/cpp/mapreduce/client/py_helpers.cpp index 3072449866..79c372daa9 100644 --- a/yt/cpp/mapreduce/client/py_helpers.cpp +++ b/yt/cpp/mapreduce/client/py_helpers.cpp @@ -51,7 +51,7 @@ TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOpe paths.emplace_back(inputNode.AsString()); } } - paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths); + paths = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), paths); TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure())); for (const auto& path : paths) { result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path}); diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp index b89bde7854..113dee0743 100644 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ b/yt/cpp/mapreduce/client/skiff.cpp @@ -280,8 +280,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) { NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( const IRawClientPtr& rawClient, - const TClientContext& context, - const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, ENodeReaderFormat nodeReaderFormat, const TVector<TRichYPath>& tablePaths, @@ -306,7 +304,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( auto nodes = NRawClient::BatchTransform( rawClient, - NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), + NRawClient::CanonizeYPaths(rawClient, tablePaths), [&] (IRawBatchRequestPtr batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( diff --git a/yt/cpp/mapreduce/client/skiff.h b/yt/cpp/mapreduce/client/skiff.h index 24e85a61b5..5f26dc7656 100644 --- a/yt/cpp/mapreduce/client/skiff.h +++ b/yt/cpp/mapreduce/client/skiff.h @@ -60,8 +60,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema); NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( const IRawClientPtr& rawClient, - const TClientContext& context, - const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, ENodeReaderFormat nodeReaderFormat, const TVector<TRichYPath>& tablePaths, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp index 3d95f93027..9da31dcf14 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.cpp +++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp @@ -122,8 +122,6 @@ namespace NDetail { NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( const IRawClientPtr& rawClient, - const TClientContext& context, - const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, const TVector<TRichYPath>& tables, const TOperationOptions& options, @@ -137,8 +135,6 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( } return CreateSkiffSchemaIfNecessary( rawClient, - context, - clientRetryPolicy, transactionId, nodeReaderFormat, tables, @@ -214,14 +210,14 @@ TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTableP return result; } -TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList) +TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList) { TVector<TRichYPath> toCanonize; toCanonize.reserve(tableList.size()); for (const auto& table : tableList) { toCanonize.emplace_back(table.RichYPath); } - const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize); + const auto canonized = NRawClient::CanonizeYPaths(rawClient, toCanonize); Y_ABORT_UNLESS(canonized.size() == tableList.size()); TStructuredJobTableList result; @@ -437,8 +433,6 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat( } skiffSchema = TryCreateSkiffSchema( RawClient_, - Context_, - ClientRetryPolicy_, TransactionId_, tableList, OperationOptions_, diff --git a/yt/cpp/mapreduce/client/structured_table_formats.h b/yt/cpp/mapreduce/client/structured_table_formats.h index 162630d411..64a44c6f4d 100644 --- a/yt/cpp/mapreduce/client/structured_table_formats.h +++ b/yt/cpp/mapreduce/client/structured_table_formats.h @@ -69,7 +69,7 @@ using TStructuredJobTableList = TVector<TStructuredJobTable>; TString JobTablePathString(const TStructuredJobTable& jobTable); TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList); -TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList); +TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList); TVector<TRichYPath> GetPathList( const TStructuredJobTableList& tableList, const TMaybe<TVector<TTableSchema>>& schemaInferenceResult, diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 6ba8a3f084..30b1619b71 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -253,25 +253,26 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node) } TRichYPath CanonizeYPath( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TRichYPath& path) { - return CanonizeYPaths(retryPolicy, context, {path}).front(); + return CanonizeYPaths(rawClient, {path}).front(); } TVector<TRichYPath> CanonizeYPaths( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TVector<TRichYPath>& paths) { - THttpRawBatchRequest batch(context, retryPolicy); + auto batch = rawClient->CreateRawBatchRequest(); + TVector<NThreading::TFuture<TRichYPath>> futures; futures.reserve(paths.size()); - for (int i = 0; i < static_cast<int>(paths.size()); ++i) { - futures.push_back(batch.CanonizeYPath(paths[i])); + for (const auto& path : paths) { + futures.push_back(batch->CanonizeYPath(path)); } - batch.ExecuteBatch(); + + batch->ExecuteBatch(); + TVector<TRichYPath> result; result.reserve(futures.size()); for (auto& future : futures) { diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 101985735c..1487624171 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -34,13 +34,11 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node); //////////////////////////////////////////////////////////////////////////////// TRichYPath CanonizeYPath( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TRichYPath& path); TVector<TRichYPath> CanonizeYPaths( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, + const IRawClientPtr& rawClient, const TVector<TRichYPath>& paths); NHttpClient::IHttpResponsePtr SkyShareTable( |