aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorapachee <apachee@yandex-team.com>2024-05-24 20:47:38 +0300
committerapachee <apachee@yandex-team.com>2024-05-24 20:58:06 +0300
commit3e42aa91c70358fdd3ec39d0ac23072fc230b209 (patch)
tree422612a54143006d874a3997b1ef10efe34da552
parentb45547676013a5d43cd0072a85643326e686b6fc (diff)
downloadydb-3e42aa91c70358fdd3ec39d0ac23072fc230b209.tar.gz
YT-21308: Add redirect_stdout_to_stderr flag for C++ client
Adds redirect_stdout_to_stderr spec option for operations that allows writing to stdout as if it was stderr. 6a8ac5f21955a79848d86f72715628c7b8bb65c4
-rw-r--r--yt/cpp/mapreduce/client/operation.cpp3
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp6
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h3
-rw-r--r--yt/cpp/mapreduce/interface/config.h3
-rw-r--r--yt/cpp/mapreduce/interface/helpers.cpp18
-rw-r--r--yt/cpp/mapreduce/interface/helpers.h11
-rw-r--r--yt/cpp/mapreduce/interface/operation.cpp4
-rw-r--r--yt/cpp/mapreduce/interface/ya.make1
-rw-r--r--yt/cpp/mapreduce/io/job_writer.cpp8
9 files changed, 53 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/client/operation.cpp b/yt/cpp/mapreduce/client/operation.cpp
index ade0f339b7..496b2f5723 100644
--- a/yt/cpp/mapreduce/client/operation.cpp
+++ b/yt/cpp/mapreduce/client/operation.cpp
@@ -756,7 +756,8 @@ void BuildUserJobFluently(
.DoFor(userJobSpec.JobProfilers_, [&] (TFluentList list, const auto& jobProfiler) {
list.Item().Value(BuildJobProfilerSpec(jobProfiler));
})
- .EndList();
+ .EndList()
+ .Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr());
}
template <typename T>
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index ec822e607e..181d8def03 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -419,6 +419,7 @@ TJobPreparer::TJobPreparer(
}
if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
+ IsCommandJob_ = true;
ClassName_ = TJobFactory::Get()->GetJobName(&job);
Command_ = commandJob->GetCommand();
} else {
@@ -465,6 +466,11 @@ ui64 TJobPreparer::GetTotalFileSize() const
return TotalFileSize_;
}
+bool TJobPreparer::ShouldRedirectStdoutToStderr() const
+{
+ return !IsCommandJob_ && OperationPreparer_.GetContext().Config->RedirectStdoutToStderr;
+}
+
TString TJobPreparer::GetFileStorage() const
{
return Options_.FileStorage_ ?
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
index 67eb28b31a..54c978c0fb 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.h
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -80,6 +80,7 @@ public:
const TUserJobSpec& GetSpec() const;
bool ShouldMountSandbox() const;
ui64 GetTotalFileSize() const;
+ bool ShouldRedirectStdoutToStderr() const;
private:
TOperationPreparer& OperationPreparer_;
@@ -95,6 +96,8 @@ private:
TString Command_;
ui64 TotalFileSize_ = 0;
+ bool IsCommandJob_ = false;
+
private:
TString GetFileStorage() const;
TYPath GetCachePath() const;
diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h
index de5f5ab7fc..3debc8a961 100644
--- a/yt/cpp/mapreduce/interface/config.h
+++ b/yt/cpp/mapreduce/interface/config.h
@@ -198,6 +198,9 @@ struct TConfig
/// Which implemetation of table writer to use.
ETableWriterVersion TableWriterVersion = ETableWriterVersion::Auto;
+ /// Redirects stdout to stderr for jobs.
+ bool RedirectStdoutToStderr = false;
+
static bool GetBool(const char* var, bool defaultValue = false);
static int GetInt(const char* var, int defaultValue);
static TDuration GetDuration(const char* var, TDuration defaultValue);
diff --git a/yt/cpp/mapreduce/interface/helpers.cpp b/yt/cpp/mapreduce/interface/helpers.cpp
new file mode 100644
index 0000000000..f396883ea5
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/helpers.cpp
@@ -0,0 +1,18 @@
+#include "helpers.h"
+
+#include <util/string/cast.h>
+
+#include <util/system/env.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+int GetJobFirstOutputTableFD()
+{
+ return FromString<int>(GetEnv("YT_FIRST_OUTPUT_TABLE_FD", "1"));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/helpers.h b/yt/cpp/mapreduce/interface/helpers.h
new file mode 100644
index 0000000000..0aa8086b49
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/helpers.h
@@ -0,0 +1,11 @@
+#pragma once
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+int GetJobFirstOutputTableFD();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/interface/operation.cpp b/yt/cpp/mapreduce/interface/operation.cpp
index b3229117d9..d5b9271bcd 100644
--- a/yt/cpp/mapreduce/interface/operation.cpp
+++ b/yt/cpp/mapreduce/interface/operation.cpp
@@ -1,5 +1,7 @@
#include "operation.h"
+#include <yt/cpp/mapreduce/interface/helpers.h>
+
#include <util/generic/iterator_range.h>
namespace NYT {
@@ -127,7 +129,7 @@ TRawJobContext::TRawJobContext(size_t outputTableCount)
: InputFile_(Duplicate(0))
{
for (size_t i = 0; i != outputTableCount; ++i) {
- OutputFileList_.emplace_back(Duplicate(3 * i + 1));
+ OutputFileList_.emplace_back(Duplicate(3 * i + GetJobFirstOutputTableFD()));
}
}
diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make
index 1d76b1dabb..f9bc3e172c 100644
--- a/yt/cpp/mapreduce/interface/ya.make
+++ b/yt/cpp/mapreduce/interface/ya.make
@@ -11,6 +11,7 @@ SRCS(
cypress.cpp
errors.cpp
format.cpp
+ helpers.cpp
job_counters.cpp
job_statistics.cpp
io.cpp
diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp
index 21b5ef6b91..3c86f9cbb4 100644
--- a/yt/cpp/mapreduce/io/job_writer.cpp
+++ b/yt/cpp/mapreduce/io/job_writer.cpp
@@ -1,5 +1,6 @@
#include "job_writer.h"
+#include <yt/cpp/mapreduce/interface/helpers.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <util/system/file.h>
@@ -27,8 +28,11 @@ TJobWriterStream::TJobWriterStream(const TFile& file)
TJobWriter::TJobWriter(size_t outputTableCount)
{
+ int firstOutputTableFD = GetJobFirstOutputTableFD();
+
for (size_t i = 0; i < outputTableCount; ++i) {
- Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(i * 3 + 1)));
+ int fd = static_cast<int>(i * 3 + firstOutputTableFD);
+ Streams_.emplace_back(std::make_unique<NDetail::TJobWriterStream>(fd));
}
}
@@ -73,7 +77,7 @@ THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
TSingleStreamJobWriter::TSingleStreamJobWriter(size_t tableIndex)
: TableIndex_(tableIndex)
- , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + 1)))
+ , Stream_(std::make_unique<NDetail::TJobWriterStream>(static_cast<int>(tableIndex * 3 + GetJobFirstOutputTableFD())))
{ }
size_t TSingleStreamJobWriter::GetStreamCount() const