diff options
| author | ermolovd <[email protected]> | 2024-11-09 12:51:55 +0300 |
|---|---|---|
| committer | ermolovd <[email protected]> | 2024-11-09 13:01:48 +0300 |
| commit | 1f59ab019232ff97a73c7c13736b254925fa8b0b (patch) | |
| tree | 1fa21798279238d1b8a35d5870cd562a9e368e90 /yt/cpp/mapreduce/client/operation.cpp | |
| parent | c351a198d988d7a703e9c8b7dcbb19285a96de5e (diff) | |
add nirvana block url to started by block
commit_hash:0468f946ddc0d3a27c085e6e5ec4ce8121c4024d
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 147 |
1 files changed, 94 insertions, 53 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index c94d54d56e7..c96c191bdd7 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -4,9 +4,7 @@ #include "client.h" #include "operation_helpers.h" #include "operation_tracker.h" -#include "transaction.h" #include "prepare_operation.h" -#include "retry_heavy_write_request.h" #include "skiff.h" #include "structured_table_formats.h" #include "yt_poller.h" @@ -50,6 +48,8 @@ #include <util/string/cast.h> #include <util/system/thread.h> +#include <util/system/env.h> +#include <util/system/fs.h> namespace NYT { namespace NDetail { @@ -760,55 +760,95 @@ void BuildUserJobFluently( .Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr()); } +// Might return undefined value. +TNode GetNirvanaBlockUrlFromContext() +{ + auto filePath = TString("/slot/sandbox/j/job_context.json"); + auto nvYtOperationId = GetEnv("NV_YT_OPERATION_ID"); + if (nvYtOperationId.empty()) { + return {}; + } + if (!NFs::Exists(filePath)) { + return {}; + } + NJson::TJsonValue json; + try { + auto inf = TIFStream(filePath); + json = NJson::ReadJsonTree(&inf, /*throwOnError*/ true); + } catch (const std::exception& ex) { + YT_LOG_ERROR("Failed to load nirvana job context: %v", ex.what()); + return {}; + } + const auto* url = json.GetValueByPath("meta.blockURL"); + if (!url || !url->IsString()) { + return {}; + } + + TNode result = url->GetString(); + result.Attributes()["_type_tag"] = "url"; + return result; +} + template <typename T> -void BuildCommonOperationPart(const TConfigPtr& config, const TOperationSpecBase<T>& baseSpec, const TOperationOptions& options, TFluentMap fluent) +void BuildCommonOperationPart( + const TConfigPtr& config, + const TOperationSpecBase<T>& baseSpec, + const TOperationOptions& options, + TNode* specNode) { const TProcessState* properties = TProcessState::Get(); - TString pool = config->Pool; + auto& startedBySpec = (*specNode)["started_by"]; + startedBySpec["hostname"] = properties->FqdnHostName, + startedBySpec["pid"] = properties->Pid; + startedBySpec["user"] = properties->UserName; + startedBySpec["wrapper_version"] = properties->ClientVersion; + + startedBySpec["command"] = TNode::CreateList(); + for (const auto& arg : properties->CensoredCommandLine) { + startedBySpec["command"].Add(arg); + } + auto nirvanaBlockUrl = GetNirvanaBlockUrlFromContext(); + if (!nirvanaBlockUrl.IsUndefined()) { + startedBySpec["nirvana_block_url"] = nirvanaBlockUrl; + } + + TString pool; if (baseSpec.Pool_) { pool = *baseSpec.Pool_; + } else { + pool = config->Pool; + } + if (!pool.empty()) { + (*specNode)["pool"] = pool; + } + if (baseSpec.Weight_.Defined()) { + (*specNode)["weight"] = *baseSpec.Weight_; + } + if (baseSpec.TimeLimit_.Defined()) { + (*specNode)["time_limit"] = baseSpec.TimeLimit_->MilliSeconds(); + } + if (baseSpec.PoolTrees().Defined()) { + TNode poolTreesSpec = TNode::CreateList(); + for (const auto& tree : *baseSpec.PoolTrees()) { + poolTreesSpec.Add(tree); + } + (*specNode)["pool_trees"] = std::move(poolTreesSpec); + } + if (baseSpec.ResourceLimits().Defined()) { + auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits()); + if (!resourceLimitsSpec.IsUndefined()) { + (*specNode)["resource_limits"] = std::move(resourceLimitsSpec); + } + } + if (options.SecureVault_.Defined()) { + Y_ENSURE(options.SecureVault_->IsMap(), + "SecureVault must be a map node, got " << options.SecureVault_->GetType()); + (*specNode)["secure_vault"] = *options.SecureVault_; + } + if (baseSpec.Title_.Defined()) { + (*specNode)["title"] = *baseSpec.Title_; } - - fluent - .Item("started_by") - .BeginMap() - .Item("hostname").Value(properties->FqdnHostName) - .Item("pid").Value(properties->Pid) - .Item("user").Value(properties->UserName) - .Item("command").List(properties->CensoredCommandLine) - .Item("wrapper_version").Value(properties->ClientVersion) - .EndMap() - .DoIf(!pool.empty(), [&] (TFluentMap fluentMap) { - fluentMap.Item("pool").Value(pool); - }) - .DoIf(baseSpec.Weight_.Defined(), [&] (TFluentMap fluentMap) { - fluentMap.Item("weight").Value(*baseSpec.Weight_); - }) - .DoIf(baseSpec.TimeLimit_.Defined(), [&] (TFluentMap fluentMap) { - fluentMap.Item("time_limit").Value(baseSpec.TimeLimit_->MilliSeconds()); - }) - .DoIf(baseSpec.PoolTrees().Defined(), [&] (TFluentMap fluentMap) { - TNode poolTreesSpec = TNode::CreateList(); - for (const auto& tree : *baseSpec.PoolTrees()) { - poolTreesSpec.Add(tree); - } - fluentMap.Item("pool_trees").Value(poolTreesSpec); - }) - .DoIf(baseSpec.ResourceLimits().Defined(), [&] (TFluentMap fluentMap) { - auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits()); - if (!resourceLimitsSpec.IsUndefined()) { - fluentMap.Item("resource_limits").Value(std::move(resourceLimitsSpec)); - } - }) - .DoIf(options.SecureVault_.Defined(), [&] (TFluentMap fluentMap) { - Y_ENSURE(options.SecureVault_->IsMap(), - "SecureVault must be a map node, got " << options.SecureVault_->GetType()); - fluentMap.Item("secure_vault").Value(*options.SecureVault_); - }) - .DoIf(baseSpec.Title_.Defined(), [&] (TFluentMap fluentMap) { - fluentMap.Item("title").Value(*baseSpec.Title_); - }); } template <typename TSpec> @@ -1059,9 +1099,10 @@ void DoExecuteMap( .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) { fluent.Item("ordered").Value(spec.Ordered_.GetRef()); }) - .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); + specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true); specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true); if (!preparer->GetContext().Config->TableWriter.Empty()) { @@ -1190,9 +1231,9 @@ void DoExecuteReduce( .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) { fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_)); }) - .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildCommonUserOperationPart(spec, &specNode["spec"]); BuildJobCountOperationPart(spec, &specNode["spec"]); @@ -1304,9 +1345,9 @@ void DoExecuteJoinReduce( fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter); }) .EndMap() - .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildCommonUserOperationPart(spec, &specNode["spec"]); BuildJobCountOperationPart(spec, &specNode["spec"]); @@ -1494,13 +1535,13 @@ void DoExecuteMapReduce( .Do([&] (TFluentMap) { spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName())); }) - .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); if (spec.Ordered_) { specNode["spec"]["ordered"] = *spec.Ordered_; } + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildCommonUserOperationPart(spec, &specNode["spec"]); BuildMapJobCountOperationPart(spec, &specNode["spec"]); BuildPartitionCountOperationPart(spec, &specNode["spec"]); @@ -1875,9 +1916,9 @@ void ExecuteSort( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .Do(std::bind(BuildCommonOperationPart<TSortOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildPartitionCountOperationPart(spec, &specNode["spec"]); BuildPartitionJobCountOperationPart(spec, &specNode["spec"]); BuildIntermediateDataPart(spec, &specNode["spec"]); @@ -1927,9 +1968,9 @@ void ExecuteMerge( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .Do(std::bind(BuildCommonOperationPart<TMergeOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildJobCountOperationPart(spec, &specNode["spec"]); auto startOperation = [ @@ -1967,9 +2008,9 @@ void ExecuteErase( .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) { fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_)); }) - .Do(std::bind(BuildCommonOperationPart<TEraseOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), @@ -2021,9 +2062,9 @@ void ExecuteRemoteCopy( "doesn't make sense without CopyAttributes == true"); fluent.Item("attribute_keys").List(spec.AttributeKeys_); }) - .Do(std::bind(BuildCommonOperationPart<TRemoteCopyOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); auto startOperation = [ operation=operation.Get(), spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options), @@ -2130,9 +2171,9 @@ void ExecuteVanilla( TNode specNode = BuildYsonNodeFluently() .BeginMap().Item("spec").BeginMap() .Item("tasks").DoMapFor(spec.Tasks_, addTask) - .Do(std::bind(BuildCommonOperationPart<TVanillaOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1)) .EndMap().EndMap(); + BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]); BuildCommonUserOperationPart(spec, &specNode["spec"]); auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () { |
