summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/library/process/subprocess.cpp34
-rw-r--r--yt/yt/library/process/subprocess.h9
2 files changed, 36 insertions, 7 deletions
diff --git a/yt/yt/library/process/subprocess.cpp b/yt/yt/library/process/subprocess.cpp
index 02555b0c9bf..809ad13c499 100644
--- a/yt/yt/library/process/subprocess.cpp
+++ b/yt/yt/library/process/subprocess.cpp
@@ -20,12 +20,14 @@ using namespace NPipes;
////////////////////////////////////////////////////////////////////////////////
const static size_t PipeBlockSize = 64 * 1024;
-static NLogging::TLogger Logger("Subprocess");
+
+static const NLogging::TLogger Logger("Subprocess");
////////////////////////////////////////////////////////////////////////////////
-TSubprocess::TSubprocess(const TString& path, bool copyEnv)
- : Process_(New<TSimpleProcess>(path, copyEnv))
+TSubprocess::TSubprocess(TString path, bool copyEnv)
+ : Path_(std::move(path))
+ , Process_(New<TSimpleProcess>(Path_, copyEnv))
{ }
TSubprocess TSubprocess::CreateCurrentProcessSpawner()
@@ -43,9 +45,30 @@ void TSubprocess::AddArguments(std::initializer_list<TStringBuf> args)
Process_->AddArguments(args);
}
-TSubprocessResult TSubprocess::Execute(const TSharedRef& input)
+TSubprocessResult TSubprocess::Execute(const TSharedRef& input, TDuration timeout)
{
#ifdef _unix_
+ auto killCookie = TDelayedExecutor::Submit(
+ BIND([=, path = Path_, process = GetProcess()] {
+ YT_LOG_WARNING("Killing process due to timeout (Path: %v, ProcessId: %v, Timeout: %v)",
+ path,
+ process->GetProcessId(),
+ timeout);
+
+ try {
+ process->Kill(SIGKILL);
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR(ex, "Failed to kill process (Path: %v, ProcessId: %v)",
+ path,
+ process->GetProcessId());
+ }
+ }),
+ timeout);
+
+ auto cookieGuard = Finally([&] {
+ TDelayedExecutor::Cancel(killCookie);
+ });
+
auto inputStream = Process_->GetStdInWriter();
auto outputStream = Process_->GetStdOutReader();
auto errorStream = Process_->GetStdErrReader();
@@ -58,8 +81,9 @@ TSubprocessResult TSubprocess::Execute(const TSharedRef& input)
auto size = WaitFor(stream->Read(buffer))
.ValueOrThrow();
- if (size == 0)
+ if (size == 0) {
break;
+ }
// ToDo(psushin): eliminate copying.
output.Append(buffer.Begin(), size);
diff --git a/yt/yt/library/process/subprocess.h b/yt/yt/library/process/subprocess.h
index 223db533f6b..68dab8896c5 100644
--- a/yt/yt/library/process/subprocess.h
+++ b/yt/yt/library/process/subprocess.h
@@ -21,14 +21,17 @@ struct TSubprocessResult
class TSubprocess
{
public:
- explicit TSubprocess(const TString& path, bool copyEnv = true);
+ explicit TSubprocess(TString path, bool copyEnv = true);
static TSubprocess CreateCurrentProcessSpawner();
void AddArgument(TStringBuf arg);
void AddArguments(std::initializer_list<TStringBuf> args);
- TSubprocessResult Execute(const TSharedRef& input = TSharedRef::MakeEmpty());
+ TSubprocessResult Execute(
+ const TSharedRef& input = TSharedRef::MakeEmpty(),
+ TDuration timeout = TDuration::Max());
+
void Kill(int signal);
TString GetCommandLine() const;
@@ -36,6 +39,8 @@ public:
TProcessBasePtr GetProcess() const;
private:
+ const TString Path_;
+
const TProcessBasePtr Process_;
};