diff options
author | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-24 22:18:54 +0300 |
---|---|---|
committer | hiddenpath <hiddenpath@yandex-team.com> | 2024-12-24 22:37:56 +0300 |
commit | 8fa83915202633143936bfce16cf1c06653e2dce (patch) | |
tree | 49c891bc3ea761cfbec7479a2e169a1bde8231a0 /yt | |
parent | 845baeff50212ec002c6a67e16bdf7c03c997fbf (diff) | |
download | ydb-8fa83915202633143936bfce16cf1c06653e2dce.tar.gz |
YT-23616: Move start_op to THttpRawClient
commit_hash:dea4a6f4cd03d1ce40a117dbe660e14750dc6da5
Diffstat (limited to 'yt')
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 116 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 25 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/raw_client.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.cpp | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/raw_client.h | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp | 16 | ||||
-rw-r--r-- | yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h | 13 |
8 files changed, 118 insertions, 81 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index bff10c3328..9441c7efe2 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -969,9 +969,9 @@ void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec) TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options) { - MergeNodes(dst["spec"], spec); + MergeNodes(dst, spec); if (options.Spec_) { - MergeNodes(dst["spec"], *options.Spec_); + MergeNodes(dst, *options.Spec_); } return dst; } @@ -1129,7 +1129,7 @@ void DoExecuteMap( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("mapper").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( map, @@ -1148,18 +1148,18 @@ void DoExecuteMap( .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) { fluent.Item("ordered").Value(spec.Ordered_.GetRef()); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); - specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); - specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); + specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); if (!preparer->GetContext().Config->TableWriter.Empty()) { - specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; + specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter; } - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1168,7 +1168,7 @@ void DoExecuteMap( operationIo, mapper ] () { - auto operationId = preparer->StartOperation(operation, "map", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec); LogJob(operationId, mapper.Get(), "mapper"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1247,7 +1247,7 @@ void DoExecuteReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1280,11 +1280,11 @@ void DoExecuteReduce( .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) { fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1293,7 +1293,7 @@ void DoExecuteReduce( operationIo, reducer ] () { - auto operationId = preparer->StartOperation(operation, "reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1373,7 +1373,7 @@ void DoExecuteJoinReduce( spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName())); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("reducer").DoMap([&] (TFluentMap fluent) { BuildUserJobFluently( reduce, @@ -1394,11 +1394,11 @@ void DoExecuteJoinReduce( fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter); }) .EndMap() - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1407,7 +1407,7 @@ void DoExecuteJoinReduce( reducer, operationIo ] () { - auto operationId = preparer->StartOperation(operation, "join_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec); LogJob(operationId, reducer.Get(), "reducer"); LogYPaths(operationId, operationIo.Inputs, "input"); @@ -1505,7 +1505,7 @@ void DoExecuteMapReduce( TString title; TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .DoIf(hasMapper, [&] (TFluentMap fluent) { TJobPreparer map( *preparer, @@ -1584,18 +1584,18 @@ void DoExecuteMapReduce( .Do([&] (TFluentMap) { spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName())); }) - .EndMap().EndMap(); + .EndMap(); if (spec.Ordered_) { - specNode["spec"]["ordered"] = *spec.Ordered_; + specNode["ordered"] = *spec.Ordered_; } - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); - BuildMapJobCountOperationPart(spec, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); - BuildDataSizePerSortJobPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); + BuildMapJobCountOperationPart(spec, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); + BuildDataSizePerSortJobPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1607,7 +1607,7 @@ void DoExecuteMapReduce( inputs=operationIo.Inputs, allOutputs ] () { - auto operationId = preparer->StartOperation(operation, "map_reduce", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec); LogJob(operationId, mapper.Get(), "mapper"); LogJob(operationId, reduceCombiner.Get(), "reduce_combiner"); @@ -1962,19 +1962,19 @@ void ExecuteSort( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("sort_by").Value(spec.SortBy_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildPartitionCountOperationPart(spec, &specNode["spec"]); - BuildPartitionJobCountOperationPart(spec, &specNode["spec"]); - BuildIntermediateDataPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildPartitionCountOperationPart(spec, &specNode); + BuildPartitionJobCountOperationPart(spec, &specNode); + BuildIntermediateDataPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -1983,7 +1983,7 @@ void ExecuteSort( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "sort", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2011,7 +2011,7 @@ void ExecuteMerge( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) .Item("mode").Value(ToString(spec.Mode_)) @@ -2021,10 +2021,10 @@ void ExecuteMerge( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildJobCountOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildJobCountOperationPart(spec, &specNode); auto startOperation = [ operation=operation.Get(), @@ -2033,7 +2033,7 @@ void ExecuteMerge( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "merge", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2055,22 +2055,22 @@ void ExecuteErase( auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("table_path").Value(tablePath) .Item("combine_chunks").Value(spec.CombineChunks_) .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer, tablePath ] () { - auto operationId = preparer->StartOperation(operation, "erase", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec); LogYPath(operationId, tablePath, "table_path"); @@ -2098,7 +2098,7 @@ void ExecuteRemoteCopy( Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required"); TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("cluster_name").Value(spec.ClusterName_) .Item("input_table_paths").List(inputs) .Item("output_table_path").Value(output) @@ -2115,9 +2115,9 @@ void ExecuteRemoteCopy( "doesn't make sense without CopyAttributes == true"); fluent.Item("attribute_keys").List(spec.AttributeKeys_); }) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options), @@ -2125,7 +2125,7 @@ void ExecuteRemoteCopy( inputs, output ] () { - auto operationId = preparer->StartOperation(operation, "remote_copy", spec); + auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec); LogYPaths(operationId, inputs, "input"); LogYPath(operationId, output, "output"); @@ -2222,15 +2222,15 @@ void ExecuteVanilla( } TNode specNode = BuildYsonNodeFluently() - .BeginMap().Item("spec").BeginMap() + .BeginMap() .Item("tasks").DoMapFor(spec.Tasks_, addTask) - .EndMap().EndMap(); + .EndMap(); - BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); - BuildCommonUserOperationPart(spec, &specNode["spec"]); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode); + BuildCommonUserOperationPart(spec, &specNode); auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () { - auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true); + auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec); return operationId; }; diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 535f300441..70bd5f8b65 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -177,35 +177,26 @@ const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const TOperationId TOperationPreparer::StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest) + EOperationType type, + const TNode& spec) { CheckValidity(); - THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType)); - if (useStartOperationRequest) { - header.AddParameter("operation_type", operationType); - } - header.AddTransactionId(TransactionId_); - header.AddMutationId(); - - auto ysonSpec = NodeToYsonString(spec); - auto responseInfo = RetryRequestWithPolicy( + auto operationId = RequestWithRetry<TOperationId>( ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>( ClientRetryPolicy_->CreatePolicyForStartOperationRequest(), TOperationPtr(operation)), - GetContext(), - header, - ysonSpec); - TOperationId operationId = ParseGuidFromResponse(responseInfo.Response); + [this, &type, &spec] (TMutationId& mutationId) { + return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec); + }); + YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)", operationId, GetPreparationId()); YT_LOG_INFO("Operation %v started (%v): %v", operationId, - operationType, + type, GetOperationWebInterfaceUrl(GetContext().ServerName, operationId)); TOperationExecutionTimeTracker::Get()->Start(operationId); diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 4ab20eeecd..ef3a790c91 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -28,9 +28,8 @@ public: TOperationId StartOperation( TOperation* operation, - const TString& operationType, - const TNode& spec, - bool useStartOperationRequest = false); + EOperationType type, + const TNode& spec); const IClientRetryPolicyPtr& GetClientRetryPolicy() const; diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index 4994826863..ff2dafeb6d 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -139,6 +139,12 @@ public: // Operations + virtual TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) = 0; + virtual TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) = 0; diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp index 65bfa01cea..2868e10f6e 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -289,6 +289,18 @@ void THttpRawClient::CommitTransaction( RequestWithoutRetry(Context_, mutationId, header)->GetResponse(); } +TOperationId THttpRawClient::StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + THttpHeader header("POST", "start_op"); + header.AddMutationId(); + header.MergeParameters(NRawClient::SerializeParamsForStartOperation(transactionId, type, spec)); + return ParseGuidFromResponse(RequestWithoutRetry(Context_, mutationId, header)->GetResponse()); +} + TOperationAttributes THttpRawClient::GetOperation( const TOperationId& operationId, const TGetOperationOptions& options) diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h index e540d1b331..1b3e274507 100644 --- a/yt/cpp/mapreduce/raw_client/raw_client.h +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -136,6 +136,12 @@ public: // Operations + TOperationId StartOperation( + TMutationId& mutationId, + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) override; + TOperationAttributes GetOperation( const TOperationId& operationId, const TGetOperationOptions& options = {}) override; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp index 2869ddcc0f..98ef5ed099 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.cpp @@ -393,7 +393,21 @@ TNode SerializeParamsForListOperations( return result; } -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options) +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec) +{ + TNode result; + SetTransactionIdParam(&result, transactionId); + result["operation_type"] = ToString(type); + result["spec"] = spec; + return result; +} + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options) { auto includeRuntime = options.IncludeRuntime_; TNode result; diff --git a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h index acbf003b5c..308dcfea64 100644 --- a/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h +++ b/yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h @@ -1,8 +1,10 @@ #pragma once #include <yt/cpp/mapreduce/common/helpers.h> -#include <yt/cpp/mapreduce/interface/fwd.h> + #include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/operation.h> namespace NYT::NDetail::NRawClient { @@ -95,7 +97,14 @@ TNode SerializeParamsForConcatenate( TNode SerializeParamsForPingTx( const TTransactionId& transactionId); -TNode SerializeParamsForGetOperation(const std::variant<TString, TOperationId>& aliasOrOperationId, const TGetOperationOptions& options); +TNode SerializeParamsForStartOperation( + const TTransactionId& transactionId, + EOperationType type, + const TNode& spec); + +TNode SerializeParamsForGetOperation( + const std::variant<TString, TOperationId>& aliasOrOperationId, + const TGetOperationOptions& options); TNode SerializeParamsForAbortOperation( const TOperationId& operationId); |