diff options
author | apachee <apachee@yandex-team.com> | 2024-05-24 20:47:38 +0300 |
---|---|---|
committer | apachee <apachee@yandex-team.com> | 2024-05-24 20:58:06 +0300 |
commit | 3e42aa91c70358fdd3ec39d0ac23072fc230b209 (patch) | |
tree | 422612a54143006d874a3997b1ef10efe34da552 | |
parent | b45547676013a5d43cd0072a85643326e686b6fc (diff) | |
download | ydb-3e42aa91c70358fdd3ec39d0ac23072fc230b209.tar.gz |
YT-21308: Add redirect_stdout_to_stderr flag for C++ client
Adds redirect_stdout_to_stderr spec option for operations that allows writing to stdout as if it was stderr.
6a8ac5f21955a79848d86f72715628c7b8bb65c4
-rw-r--r-- | yt/cpp/mapreduce/client/operation.cpp | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.cpp | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/operation_preparer.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/config.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/helpers.cpp | 18 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/helpers.h | 11 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/operation.cpp | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/ya.make | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/job_writer.cpp | 8 |
9 files changed, 53 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp index ade0f339b7..496b2f5723 100644 --- a/yt/cpp/mapreduce/client/operation.cpp +++ b/yt/cpp/mapreduce/client/operation.cpp @@ -756,7 +756,8 @@ void BuildUserJobFluently( .DoFor(userJobSpec.JobProfilers_, [&] (TFluentList list, const auto& jobProfiler) { list.Item().Value(BuildJobProfilerSpec(jobProfiler)); }) - .EndList(); + .EndList() + .Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr()); } template <typename T> diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index ec822e607e..181d8def03 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -419,6 +419,7 @@ TJobPreparer::TJobPreparer( } if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) { + IsCommandJob_ = true; ClassName_ = TJobFactory::Get()->GetJobName(&job); Command_ = commandJob->GetCommand(); } else { @@ -465,6 +466,11 @@ ui64 TJobPreparer::GetTotalFileSize() const return TotalFileSize_; } +bool TJobPreparer::ShouldRedirectStdoutToStderr() const +{ + return !IsCommandJob_ && OperationPreparer_.GetContext().Config->RedirectStdoutToStderr; +} + TString TJobPreparer::GetFileStorage() const { return Options_.FileStorage_ ? diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 67eb28b31a..54c978c0fb 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -80,6 +80,7 @@ public: const TUserJobSpec& GetSpec() const; bool ShouldMountSandbox() const; ui64 GetTotalFileSize() const; + bool ShouldRedirectStdoutToStderr() const; private: TOperationPreparer& OperationPreparer_; @@ -95,6 +96,8 @@ private: TString Command_; ui64 TotalFileSize_ = 0; + bool IsCommandJob_ = false; + private: TString GetFileStorage() const; TYPath GetCachePath() const; diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index de5f5ab7fc..3debc8a961 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -198,6 +198,9 @@ struct TConfig /// Which implemetation of table writer to use. ETableWriterVersion TableWriterVersion = ETableWriterVersion::Auto; + /// Redirects stdout to stderr for jobs. + bool RedirectStdoutToStderr = false; + static bool GetBool(const char* var, bool defaultValue = false); static int GetInt(const char* var, int defaultValue); static TDuration GetDuration(const char* var, TDuration defaultValue); diff --git a/yt/cpp/mapreduce/interface/helpers.cpp b/yt/cpp/mapreduce/interface/helpers.cpp new file mode 100644 index 0000000000..f396883ea5 --- /dev/null +++ b/yt/cpp/mapreduce/interface/helpers.cpp @@ -0,0 +1,18 @@ +#include "helpers.h" + +#include <util/string/cast.h> + +#include <util/system/env.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +int GetJobFirstOutputTableFD() +{ + return FromString<int>(GetEnv("YT_FIRST_OUTPUT_TABLE_FD", "1")); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/helpers.h b/yt/cpp/mapreduce/interface/helpers.h new file mode 100644 index 0000000000..0aa8086b49 --- /dev/null +++ b/yt/cpp/mapreduce/interface/helpers.h @@ -0,0 +1,11 @@ +#pragma once + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +int GetJobFirstOutputTableFD(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp index b3229117d9..d5b9271bcd 100644 --- a/yt/cpp/mapreduce/interface/operation.cpp +++ b/yt/cpp/mapreduce/interface/operation.cpp @@ -1,5 +1,7 @@ #include "operation.h" +#include <yt/cpp/mapreduce/interface/helpers.h> + #include <util/generic/iterator_range.h> namespace NYT { @@ -127,7 +129,7 @@ TRawJobContext::TRawJobContext(size_t outputTableCount) : InputFile_(Duplicate(0)) { for (size_t i = 0; i != outputTableCount; ++i) { - OutputFileList_.emplace_back(Duplicate(3 * i + 1)); + OutputFileList_.emplace_back(Duplicate(3 * i + GetJobFirstOutputTableFD())); } } diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make index 1d76b1dabb..f9bc3e172c 100644 --- a/yt/cpp/mapreduce/interface/ya.make +++ b/yt/cpp/mapreduce/interface/ya.make @@ -11,6 +11,7 @@ SRCS( cypress.cpp errors.cpp format.cpp + helpers.cpp job_counters.cpp job_statistics.cpp io.cpp diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp index 21b5ef6b91..3c86f9cbb4 100644 --- a/yt/cpp/mapreduce/io/job_writer.cpp +++ b/yt/cpp/mapreduce/io/job_writer.cpp @@ -1,5 +1,6 @@ #include "job_writer.h" +#include <yt/cpp/mapreduce/interface/helpers.h> #include <yt/cpp/mapreduce/interface/io.h> #include <util/system/file.h> @@ -27,8 +28,11 @@ TJobWriterStream::TJobWriterStream(const TFile& file) TJobWriter::TJobWriter(size_t outputTableCount) { + int firstOutputTableFD = GetJobFirstOutputTableFD(); + for (size_t i = 0; i < outputTableCount; ++i) { - Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(i * 3 + 1))); + int fd = static_cast<int>(i * 3 + firstOutputTableFD); + Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(fd)); } } @@ -73,7 +77,7 @@ THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount) TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex) : TableIndex_(tableIndex) - , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + 1))) + , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + GetJobFirstOutputTableFD()))) { } size_t TSingleStreamJobWriter::GetStreamCount() const |