summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-24 22:18:54 +0300
committerhiddenpath <[email protected]>2024-12-24 22:37:56 +0300
commit8fa83915202633143936bfce16cf1c06653e2dce (patch)
tree49c891bc3ea761cfbec7479a2e169a1bde8231a0 /yt/cpp/mapreduce/client/operation.cpp
parent845baeff50212ec002c6a67e16bdf7c03c997fbf (diff)
YT-23616: Move start_op to THttpRawClient
commit_hash:dea4a6f4cd03d1ce40a117dbe660e14750dc6da5
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp116
1 files changed, 58 insertions, 58 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index bff10c33282..9441c7efe21 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec)
TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options)
{
- MergeNodes(dst["spec"], spec);
+ MergeNodes(dst, spec);
if (options.Spec_) {
- MergeNodes(dst["spec"], *options.Spec_);
+ MergeNodes(dst, *options.Spec_);
}
return dst;
}
@@ -1129,7 +1129,7 @@ void DoExecuteMap(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("mapper").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
map,
@@ -1148,18 +1148,18 @@ void DoExecuteMap(
.DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("ordered").Value(spec.Ordered_.GetRef());
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
- specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
if (!preparer->GetContext().Config->TableWriter.Empty()) {
- specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
+ specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
}
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1168,7 +1168,7 @@ void DoExecuteMap(
operationIo,
mapper
] () {
- auto operationId = preparer->StartOperation(operation, "map", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1247,7 +1247,7 @@ void DoExecuteReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1280,11 +1280,11 @@ void DoExecuteReduce(
.DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1293,7 +1293,7 @@ void DoExecuteReduce(
operationIo,
reducer
] () {
- auto operationId = preparer->StartOperation(operation, "reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce(
fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
})
.EndMap()
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce(
reducer,
operationIo
] () {
- auto operationId = preparer->StartOperation(operation, "join_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1505,7 +1505,7 @@ void DoExecuteMapReduce(
TString title;
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.DoIf(hasMapper, [&] (TFluentMap fluent) {
TJobPreparer map(
*preparer,
@@ -1584,18 +1584,18 @@ void DoExecuteMapReduce(
.Do([&] (TFluentMap) {
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
})
- .EndMap().EndMap();
+ .EndMap();
if (spec.Ordered_) {
- specNode["spec"]["ordered"] = *spec.Ordered_;
+ specNode["ordered"] = *spec.Ordered_;
}
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildMapJobCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
- BuildDataSizePerSortJobPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildMapJobCountOperationPart(spec, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
+ BuildDataSizePerSortJobPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1607,7 +1607,7 @@ void DoExecuteMapReduce(
inputs=operationIo.Inputs,
allOutputs
] () {
- auto operationId = preparer->StartOperation(operation, "map_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogJob(operationId, reduceCombiner.Get(), "reduce_combiner");
@@ -1962,19 +1962,19 @@ void ExecuteSort(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("sort_by").Value(spec.SortBy_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionJobCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildPartitionJobCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1983,7 +1983,7 @@ void ExecuteSort(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "sort", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2011,7 +2011,7 @@ void ExecuteMerge(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("mode").Value(ToString(spec.Mode_))
@@ -2021,10 +2021,10 @@ void ExecuteMerge(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -2033,7 +2033,7 @@ void ExecuteMerge(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "merge", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2055,22 +2055,22 @@ void ExecuteErase(
auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("table_path").Value(tablePath)
.Item("combine_chunks").Value(spec.CombineChunks_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
preparer,
tablePath
] () {
- auto operationId = preparer->StartOperation(operation, "erase", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec);
LogYPath(operationId, tablePath, "table_path");
@@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy(
Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required");
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("cluster_name").Value(spec.ClusterName_)
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
@@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy(
"doesn't make sense without CopyAttributes == true");
fluent.Item("attribute_keys").List(spec.AttributeKeys_);
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
@@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "remote_copy", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2222,15 +2222,15 @@ void ExecuteVanilla(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("tasks").DoMapFor(spec.Tasks_, addTask)
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {
- auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec);
return operationId;
};