summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/operation.cpp
diff options
context:
space:
mode:
authorermolovd <[email protected]>2024-11-09 12:51:55 +0300
committerermolovd <[email protected]>2024-11-09 13:01:48 +0300
commit1f59ab019232ff97a73c7c13736b254925fa8b0b (patch)
tree1fa21798279238d1b8a35d5870cd562a9e368e90 /yt/cpp/mapreduce/client/operation.cpp
parentc351a198d988d7a703e9c8b7dcbb19285a96de5e (diff)
add nirvana block url to started by block
commit_hash:0468f946ddc0d3a27c085e6e5ec4ce8121c4024d
Diffstat (limited to 'yt/cpp/mapreduce/client/operation.cpp')
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp147
1 files changed, 94 insertions, 53 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index c94d54d56e7..c96c191bdd7 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -4,9 +4,7 @@
#include "client.h"
#include "operation_helpers.h"
#include "operation_tracker.h"
-#include "transaction.h"
#include "prepare_operation.h"
-#include "retry_heavy_write_request.h"
#include "skiff.h"
#include "structured_table_formats.h"
#include "yt_poller.h"
@@ -50,6 +48,8 @@
#include <util/string/cast.h>
#include <util/system/thread.h>
+#include <util/system/env.h>
+#include <util/system/fs.h>
namespace NYT {
namespace NDetail {
@@ -760,55 +760,95 @@ void BuildUserJobFluently(
.Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr());
}
+// Might return undefined value.
+TNode GetNirvanaBlockUrlFromContext()
+{
+ auto filePath = TString("/slot/sandbox/j/job_context.json");
+ auto nvYtOperationId = GetEnv("NV_YT_OPERATION_ID");
+ if (nvYtOperationId.empty()) {
+ return {};
+ }
+ if (!NFs::Exists(filePath)) {
+ return {};
+ }
+ NJson::TJsonValue json;
+ try {
+ auto inf = TIFStream(filePath);
+ json = NJson::ReadJsonTree(&inf, /*throwOnError*/ true);
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Failed to load nirvana job context: %v", ex.what());
+ return {};
+ }
+ const auto* url = json.GetValueByPath("meta.blockURL");
+ if (!url || !url->IsString()) {
+ return {};
+ }
+
+ TNode result = url->GetString();
+ result.Attributes()["_type_tag"] = "url";
+ return result;
+}
+
template <typename T>
-void BuildCommonOperationPart(const TConfigPtr& config, const TOperationSpecBase<T>& baseSpec, const TOperationOptions& options, TFluentMap fluent)
+void BuildCommonOperationPart(
+ const TConfigPtr& config,
+ const TOperationSpecBase<T>& baseSpec,
+ const TOperationOptions& options,
+ TNode* specNode)
{
const TProcessState* properties = TProcessState::Get();
- TString pool = config->Pool;
+ auto& startedBySpec = (*specNode)["started_by"];
+ startedBySpec["hostname"] = properties->FqdnHostName,
+ startedBySpec["pid"] = properties->Pid;
+ startedBySpec["user"] = properties->UserName;
+ startedBySpec["wrapper_version"] = properties->ClientVersion;
+
+ startedBySpec["command"] = TNode::CreateList();
+ for (const auto& arg : properties->CensoredCommandLine) {
+ startedBySpec["command"].Add(arg);
+ }
+ auto nirvanaBlockUrl = GetNirvanaBlockUrlFromContext();
+ if (!nirvanaBlockUrl.IsUndefined()) {
+ startedBySpec["nirvana_block_url"] = nirvanaBlockUrl;
+ }
+
+ TString pool;
if (baseSpec.Pool_) {
pool = *baseSpec.Pool_;
+ } else {
+ pool = config->Pool;
+ }
+ if (!pool.empty()) {
+ (*specNode)["pool"] = pool;
+ }
+ if (baseSpec.Weight_.Defined()) {
+ (*specNode)["weight"] = *baseSpec.Weight_;
+ }
+ if (baseSpec.TimeLimit_.Defined()) {
+ (*specNode)["time_limit"] = baseSpec.TimeLimit_->MilliSeconds();
+ }
+ if (baseSpec.PoolTrees().Defined()) {
+ TNode poolTreesSpec = TNode::CreateList();
+ for (const auto& tree : *baseSpec.PoolTrees()) {
+ poolTreesSpec.Add(tree);
+ }
+ (*specNode)["pool_trees"] = std::move(poolTreesSpec);
+ }
+ if (baseSpec.ResourceLimits().Defined()) {
+ auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits());
+ if (!resourceLimitsSpec.IsUndefined()) {
+ (*specNode)["resource_limits"] = std::move(resourceLimitsSpec);
+ }
+ }
+ if (options.SecureVault_.Defined()) {
+ Y_ENSURE(options.SecureVault_->IsMap(),
+ "SecureVault must be a map node, got " << options.SecureVault_->GetType());
+ (*specNode)["secure_vault"] = *options.SecureVault_;
+ }
+ if (baseSpec.Title_.Defined()) {
+ (*specNode)["title"] = *baseSpec.Title_;
}
-
- fluent
- .Item("started_by")
- .BeginMap()
- .Item("hostname").Value(properties->FqdnHostName)
- .Item("pid").Value(properties->Pid)
- .Item("user").Value(properties->UserName)
- .Item("command").List(properties->CensoredCommandLine)
- .Item("wrapper_version").Value(properties->ClientVersion)
- .EndMap()
- .DoIf(!pool.empty(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("pool").Value(pool);
- })
- .DoIf(baseSpec.Weight_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("weight").Value(*baseSpec.Weight_);
- })
- .DoIf(baseSpec.TimeLimit_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("time_limit").Value(baseSpec.TimeLimit_->MilliSeconds());
- })
- .DoIf(baseSpec.PoolTrees().Defined(), [&] (TFluentMap fluentMap) {
- TNode poolTreesSpec = TNode::CreateList();
- for (const auto& tree : *baseSpec.PoolTrees()) {
- poolTreesSpec.Add(tree);
- }
- fluentMap.Item("pool_trees").Value(poolTreesSpec);
- })
- .DoIf(baseSpec.ResourceLimits().Defined(), [&] (TFluentMap fluentMap) {
- auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits());
- if (!resourceLimitsSpec.IsUndefined()) {
- fluentMap.Item("resource_limits").Value(std::move(resourceLimitsSpec));
- }
- })
- .DoIf(options.SecureVault_.Defined(), [&] (TFluentMap fluentMap) {
- Y_ENSURE(options.SecureVault_->IsMap(),
- "SecureVault must be a map node, got " << options.SecureVault_->GetType());
- fluentMap.Item("secure_vault").Value(*options.SecureVault_);
- })
- .DoIf(baseSpec.Title_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("title").Value(*baseSpec.Title_);
- });
}
template <typename TSpec>
@@ -1059,9 +1099,10 @@ void DoExecuteMap(
.DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("ordered").Value(spec.Ordered_.GetRef());
})
- .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
+
specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
if (!preparer->GetContext().Config->TableWriter.Empty()) {
@@ -1190,9 +1231,9 @@ void DoExecuteReduce(
.DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
})
- .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildCommonUserOperationPart(spec, &specNode["spec"]);
BuildJobCountOperationPart(spec, &specNode["spec"]);
@@ -1304,9 +1345,9 @@ void DoExecuteJoinReduce(
fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
})
.EndMap()
- .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildCommonUserOperationPart(spec, &specNode["spec"]);
BuildJobCountOperationPart(spec, &specNode["spec"]);
@@ -1494,13 +1535,13 @@ void DoExecuteMapReduce(
.Do([&] (TFluentMap) {
spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
})
- .Do(std::bind(BuildCommonOperationPart<T>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
if (spec.Ordered_) {
specNode["spec"]["ordered"] = *spec.Ordered_;
}
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildCommonUserOperationPart(spec, &specNode["spec"]);
BuildMapJobCountOperationPart(spec, &specNode["spec"]);
BuildPartitionCountOperationPart(spec, &specNode["spec"]);
@@ -1875,9 +1916,9 @@ void ExecuteSort(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .Do(std::bind(BuildCommonOperationPart<TSortOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildPartitionCountOperationPart(spec, &specNode["spec"]);
BuildPartitionJobCountOperationPart(spec, &specNode["spec"]);
BuildIntermediateDataPart(spec, &specNode["spec"]);
@@ -1927,9 +1968,9 @@ void ExecuteMerge(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .Do(std::bind(BuildCommonOperationPart<TMergeOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildJobCountOperationPart(spec, &specNode["spec"]);
auto startOperation = [
@@ -1967,9 +2008,9 @@ void ExecuteErase(
.DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
})
- .Do(std::bind(BuildCommonOperationPart<TEraseOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
@@ -2021,9 +2062,9 @@ void ExecuteRemoteCopy(
"doesn't make sense without CopyAttributes == true");
fluent.Item("attribute_keys").List(spec.AttributeKeys_);
})
- .Do(std::bind(BuildCommonOperationPart<TRemoteCopyOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
auto startOperation = [
operation=operation.Get(),
spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
@@ -2130,9 +2171,9 @@ void ExecuteVanilla(
TNode specNode = BuildYsonNodeFluently()
.BeginMap().Item("spec").BeginMap()
.Item("tasks").DoMapFor(spec.Tasks_, addTask)
- .Do(std::bind(BuildCommonOperationPart<TVanillaOperationSpec>, preparer->GetContext().Config, spec, options, std::placeholders::_1))
.EndMap().EndMap();
+ BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
BuildCommonUserOperationPart(spec, &specNode["spec"]);
auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {