aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-24 22:18:54 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-24 22:37:56 +0300
commit8fa83915202633143936bfce16cf1c06653e2dce (patch)
tree49c891bc3ea761cfbec7479a2e169a1bde8231a0 /yt
parent845baeff50212ec002c6a67e16bdf7c03c997fbf (diff)
downloadydb-8fa83915202633143936bfce16cf1c06653e2dce.tar.gz
YT-23616: Move start_op to THttpRawClient
commit_hash:dea4a6f4cd03d1ce40a117dbe660e14750dc6da5
Diffstat (limited to 'yt')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp116
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp25
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h5
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp12
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h6
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp16
-rw-r--r--yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h13
8 files changed, 118 insertions, 81 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index bff10c3328..9441c7efe2 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec)
TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options)
{
- MergeNodes(dst["spec"], spec);
+ MergeNodes(dst, spec);
if (options.Spec_) {
- MergeNodes(dst["spec"], *options.Spec_);
+ MergeNodes(dst, *options.Spec_);
}
return dst;
}
@@ -1129,7 +1129,7 @@ void DoExecuteMap(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("mapper").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
map,
@@ -1148,18 +1148,18 @@ void DoExecuteMap(
.DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("ordered").Value(spec.Ordered_.GetRef());
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
- specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
+ specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
if (!preparer->GetContext().Config->TableWriter.Empty()) {
- specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
+ specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
}
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1168,7 +1168,7 @@ void DoExecuteMap(
operationIo,
mapper
] () {
- auto operationId = preparer->StartOperation(operation, "map", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1247,7 +1247,7 @@ void DoExecuteReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1280,11 +1280,11 @@ void DoExecuteReduce(
.DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1293,7 +1293,7 @@ void DoExecuteReduce(
operationIo,
reducer
] () {
- auto operationId = preparer->StartOperation(operation, "reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce(
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("reducer").DoMap([&] (TFluentMap fluent) {
BuildUserJobFluently(
reduce,
@@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce(
fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
})
.EndMap()
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce(
reducer,
operationIo
] () {
- auto operationId = preparer->StartOperation(operation, "join_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec);
LogJob(operationId, reducer.Get(), "reducer");
LogYPaths(operationId, operationIo.Inputs, "input");
@@ -1505,7 +1505,7 @@ void DoExecuteMapReduce(
TString title;
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.DoIf(hasMapper, [&] (TFluentMap fluent) {
TJobPreparer map(
*preparer,
@@ -1584,18 +1584,18 @@ void DoExecuteMapReduce(
.Do([&] (TFluentMap) {
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
})
- .EndMap().EndMap();
+ .EndMap();
if (spec.Ordered_) {
- specNode["spec"]["ordered"] = *spec.Ordered_;
+ specNode["ordered"] = *spec.Ordered_;
}
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
- BuildMapJobCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
- BuildDataSizePerSortJobPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
+ BuildMapJobCountOperationPart(spec, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
+ BuildDataSizePerSortJobPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1607,7 +1607,7 @@ void DoExecuteMapReduce(
inputs=operationIo.Inputs,
allOutputs
] () {
- auto operationId = preparer->StartOperation(operation, "map_reduce", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec);
LogJob(operationId, mapper.Get(), "mapper");
LogJob(operationId, reduceCombiner.Get(), "reduce_combiner");
@@ -1962,19 +1962,19 @@ void ExecuteSort(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("sort_by").Value(spec.SortBy_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildPartitionCountOperationPart(spec, &specNode["spec"]);
- BuildPartitionJobCountOperationPart(spec, &specNode["spec"]);
- BuildIntermediateDataPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildPartitionCountOperationPart(spec, &specNode);
+ BuildPartitionJobCountOperationPart(spec, &specNode);
+ BuildIntermediateDataPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -1983,7 +1983,7 @@ void ExecuteSort(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "sort", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2011,7 +2011,7 @@ void ExecuteMerge(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
.Item("mode").Value(ToString(spec.Mode_))
@@ -2021,10 +2021,10 @@ void ExecuteMerge(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildJobCountOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildJobCountOperationPart(spec, &specNode);
auto startOperation = [
operation=operation.Get(),
@@ -2033,7 +2033,7 @@ void ExecuteMerge(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "merge", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2055,22 +2055,22 @@ void ExecuteErase(
auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("table_path").Value(tablePath)
.Item("combine_chunks").Value(spec.CombineChunks_)
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
preparer,
tablePath
] () {
- auto operationId = preparer->StartOperation(operation, "erase", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec);
LogYPath(operationId, tablePath, "table_path");
@@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy(
Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required");
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("cluster_name").Value(spec.ClusterName_)
.Item("input_table_paths").List(inputs)
.Item("output_table_path").Value(output)
@@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy(
"doesn't make sense without CopyAttributes == true");
fluent.Item("attribute_keys").List(spec.AttributeKeys_);
})
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
@@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy(
inputs,
output
] () {
- auto operationId = preparer->StartOperation(operation, "remote_copy", spec);
+ auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec);
LogYPaths(operationId, inputs, "input");
LogYPath(operationId, output, "output");
@@ -2222,15 +2222,15 @@ void ExecuteVanilla(
}
TNode specNode = BuildYsonNodeFluently()
- .BeginMap().Item("spec").BeginMap()
+ .BeginMap()
.Item("tasks").DoMapFor(spec.Tasks_, addTask)
- .EndMap().EndMap();
+ .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
- BuildCommonUserOperationPart(spec, &specNode["spec"]);
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
+ BuildCommonUserOperationPart(spec, &specNode);
auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {
- auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true);
+ auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec);
return operationId;
};
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index 535f300441..70bd5f8b65 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -177,35 +177,26 @@ const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
TOperationId TOperationPreparer::StartOperation(
TOperation* operation,
- const TString& operationType,
- const TNode& spec,
- bool useStartOperationRequest)
+ EOperationType type,
+ const TNode& spec)
{
CheckValidity();
- THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
- if (useStartOperationRequest) {
- header.AddParameter("operation_type", operationType);
- }
- header.AddTransactionId(TransactionId_);
- header.AddMutationId();
-
- auto ysonSpec = NodeToYsonString(spec);
- auto responseInfo = RetryRequestWithPolicy(
+ auto operationId = RequestWithRetry<TOperationId>(
::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
TOperationPtr(operation)),
- GetContext(),
- header,
- ysonSpec);
- TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
+ [this, &type, &spec] (TMutationId& mutationId) {
+ return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec);
+ });
+
YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
operationId,
GetPreparationId());
YT_LOG_INFO("Operation %v started (%v): %v",
operationId,
- operationType,
+ type,
GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
TOperationExecutionTimeTracker::Get()->Start(operationId);
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
index 4ab20eeecd..ef3a790c91 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.h
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -28,9 +28,8 @@ public:
TOperationId StartOperation(
TOperation* operation,
- const TString& operationType,
- const TNode& spec,
- bool useStartOperationRequest = false);
+ EOperationType type,
+ const TNode& spec);
const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
index 4994826863..ff2dafeb6d 100644
--- a/yt/cpp/mapreduce/interface/raw_client.h
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -139,6 +139,12 @@ public:
// Operations
+ virtual TOperationId StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec) = 0;
+
virtual TOperationAttributes GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options = {}) = 0;
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 65bfa01cea..2868e10f6e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -289,6 +289,18 @@ void THttpRawClient::CommitTransaction(
RequestWithoutRetry(Context_, mutationId, header)->GetResponse();
}
+TOperationId THttpRawClient::StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec)
+{
+ THttpHeader header("POST", "start_op");
+ header.AddMutationId();
+ header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec));
+ return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse());
+}
+
TOperationAttributes THttpRawClient::GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options)
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
index e540d1b331..1b3e274507 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.h
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -136,6 +136,12 @@ public:
// Operations
+ TOperationId StartOperation(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec) override;
+
TOperationAttributes GetOperation(
const TOperationId& operationId,
const TGetOperationOptions& options = {}) override;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
index 2869ddcc0f..98ef5ed099 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp
@@ -393,7 +393,21 @@ TNode SerializeParamsForListOperations(
return result;
}
-TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options)
+TNode SerializeParamsForStartOperation(
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec)
+{
+ TNode result;
+ SetTransactionIdParam(&result, transactionId);
+ result["operation_type"] = ToString(type);
+ result["spec"] = spec;
+ return result;
+}
+
+TNode SerializeParamsForGetOperation(
+ const std::variant<TString, TOperationId>& aliasOrOperationId,
+ const TGetOperationOptions& options)
{
auto includeRuntime = options.IncludeRuntime_;
TNode result;
diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
index acbf003b5c..308dcfea64 100644
--- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
+++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h
@@ -1,8 +1,10 @@
#pragma once
#include <yt/cpp/mapreduce/common/helpers.h>
-#include <yt/cpp/mapreduce/interface/fwd.h>
+
#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
namespace NYT::NDetail::NRawClient {
@@ -95,7 +97,14 @@ TNode SerializeParamsForConcatenate(
TNode SerializeParamsForPingTx(
const TTransactionId& transactionId);
-TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options);
+TNode SerializeParamsForStartOperation(
+ const TTransactionId& transactionId,
+ EOperationType type,
+ const TNode& spec);
+
+TNode SerializeParamsForGetOperation(
+ const std::variant<TString, TOperationId>& aliasOrOperationId,
+ const TGetOperationOptions& options);
TNode SerializeParamsForAbortOperation(
const TOperationId& operationId);