diff options
| author | hiddenpath <[email protected]> | 2024-12-24 22:18:54 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-24 22:37:56 +0300 |
| commit | 8fa83915202633143936bfce16cf1c06653e2dce (patch) | |
| tree | 49c891bc3ea761cfbec7479a2e169a1bde8231a0 /yt/cpp/mapreduce/client/operation.cpp | |
| parent | 845baeff50212ec002c6a67e16bdf7c03c997fbf (diff) | |
YT-23616: Move start_op to THttpRawClient
commit_hash:dea4a6f4cd03d1ce40a117dbe660e14750dc6da5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 116 |
1 files changed, 58 insertions, 58 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index bff10c33282..9441c7efe21 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec) TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options) { - MergeNodes(dst["spec"], spec); + MergeNodes(dst, spec); if (options.Spec_) { - MergeNodes(dst["spec"], *options.Spec_); + MergeNodes(dst, *options.Spec_); } return dst; } @@ -1129,7 +1129,7 @@ void DoExecuteMap( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("mapper").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( map, @@ -1148,18 +1148,18 @@ void DoExecuteMap( .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) { fluent.Item("ordered").Value(spec.Ordered_.GetRef()); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); - specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); - specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); if (!preparer->GetContext().Config->TableWriter.Empty()) { - specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; + specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; } - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1168,7 +1168,7 @@ void DoExecuteMap( operationIo, mapper ] () { - auto operationId = preparer->StartOperation(operation, "map", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec); LogJob(operationId, mapper.Get(), "mapper"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1247,7 +1247,7 @@ void DoExecuteReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1280,11 +1280,11 @@ void DoExecuteReduce( .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) { fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1293,7 +1293,7 @@ void DoExecuteReduce( operationIo, reducer ] () { - auto operationId = preparer->StartOperation(operation, "reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce( fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter); }) .EndMap() - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce( reducer, operationIo ] () { - auto operationId = preparer->StartOperation(operation, "join_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1505,7 +1505,7 @@ void DoExecuteMapReduce( TString title; TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .DoIf(hasMapper, [&] (TFluentMap fluent) { TJobPreparer map( *preparer, @@ -1584,18 +1584,18 @@ void DoExecuteMapReduce( .Do([&] (TFluentMap) { spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName())); }) - .EndMap().EndMap(); + .EndMap(); if (spec.Ordered_) { - specNode["spec"]["ordered"] = *spec.Ordered_; + specNode["ordered"] = *spec.Ordered_; } - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildMapJobCountOperationPart(spec, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); - BuildDataSizePerSortJobPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildMapJobCountOperationPart(spec, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); + BuildDataSizePerSortJobPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1607,7 +1607,7 @@ void DoExecuteMapReduce( inputs=operationIo.Inputs, allOutputs ] () { - auto operationId = preparer->StartOperation(operation, "map_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec); LogJob(operationId, mapper.Get(), "mapper"); LogJob(operationId, reduceCombiner.Get(), "reduce_combiner"); @@ -1962,19 +1962,19 @@ void ExecuteSort( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("sort_by").Value(spec.SortBy_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildPartitionJobCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildPartitionJobCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1983,7 +1983,7 @@ void ExecuteSort( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "sort", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2011,7 +2011,7 @@ void ExecuteMerge( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("mode").Value(ToString(spec.Mode_)) @@ -2021,10 +2021,10 @@ void ExecuteMerge( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -2033,7 +2033,7 @@ void ExecuteMerge( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "merge", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2055,22 +2055,22 @@ void ExecuteErase( auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("table_path").Value(tablePath) .Item("combine_chunks").Value(spec.CombineChunks_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer, tablePath ] () { - auto operationId = preparer->StartOperation(operation, "erase", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec); LogYPath(operationId, tablePath, "table_path"); @@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy( Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required"); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("cluster_name").Value(spec.ClusterName_) .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) @@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy( "doesn't make sense without CopyAttributes == true"); fluent.Item("attribute_keys").List(spec.AttributeKeys_); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options), @@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "remote_copy", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2222,15 +2222,15 @@ void ExecuteVanilla( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("tasks").DoMapFor(spec.Tasks_, addTask) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () { - auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true); + auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec); return operationId; }; |
