diff options
| -rw-r--r-- | yt/yt/library/process/subprocess.cpp | 34 | ||||
| -rw-r--r-- | yt/yt/library/process/subprocess.h | 9 |
2 files changed, 36 insertions, 7 deletions
diff --git a/yt/yt/library/process/subprocess.cpp b/yt/yt/library/process/subprocess.cpp index 02555b0c9bf..809ad13c499 100644 --- a/yt/yt/library/process/subprocess.cpp +++ b/yt/yt/library/process/subprocess.cpp @@ -20,12 +20,14 @@ using namespace NPipes; //////////////////////////////////////////////////////////////////////////////// const static size_t PipeBlockSize = 64 * 1024; -static NLogging::TLogger Logger("Subprocess"); + +static const NLogging::TLogger Logger("Subprocess"); //////////////////////////////////////////////////////////////////////////////// -TSubprocess::TSubprocess(const TString& path, bool copyEnv) - : Process_(New<TSimpleProcess>(path, copyEnv)) +TSubprocess::TSubprocess(TString path, bool copyEnv) + : Path_(std::move(path)) + , Process_(New<TSimpleProcess>(Path_, copyEnv)) { } TSubprocess TSubprocess::CreateCurrentProcessSpawner() @@ -43,9 +45,30 @@ void TSubprocess::AddArguments(std::initializer_list<TStringBuf> args) Process_->AddArguments(args); } -TSubprocessResult TSubprocess::Execute(const TSharedRef& input) +TSubprocessResult TSubprocess::Execute(const TSharedRef& input, TDuration timeout) { #ifdef _unix_ + auto killCookie = TDelayedExecutor::Submit( + BIND([=, path = Path_, process = GetProcess()] { + YT_LOG_WARNING("Killing process due to timeout (Path: %v, ProcessId: %v, Timeout: %v)", + path, + process->GetProcessId(), + timeout); + + try { + process->Kill(SIGKILL); + } catch (const std::exception& ex) { + YT_LOG_ERROR(ex, "Failed to kill process (Path: %v, ProcessId: %v)", + path, + process->GetProcessId()); + } + }), + timeout); + + auto cookieGuard = Finally([&] { + TDelayedExecutor::Cancel(killCookie); + }); + auto inputStream = Process_->GetStdInWriter(); auto outputStream = Process_->GetStdOutReader(); auto errorStream = Process_->GetStdErrReader(); @@ -58,8 +81,9 @@ TSubprocessResult TSubprocess::Execute(const TSharedRef& input) auto size = WaitFor(stream->Read(buffer)) .ValueOrThrow(); - if (size == 0) + if (size == 0) { break; + } // ToDo(psushin): eliminate copying. output.Append(buffer.Begin(), size); diff --git a/yt/yt/library/process/subprocess.h b/yt/yt/library/process/subprocess.h index 223db533f6b..68dab8896c5 100644 --- a/yt/yt/library/process/subprocess.h +++ b/yt/yt/library/process/subprocess.h @@ -21,14 +21,17 @@ struct TSubprocessResult class TSubprocess { public: - explicit TSubprocess(const TString& path, bool copyEnv = true); + explicit TSubprocess(TString path, bool copyEnv = true); static TSubprocess CreateCurrentProcessSpawner(); void AddArgument(TStringBuf arg); void AddArguments(std::initializer_list<TStringBuf> args); - TSubprocessResult Execute(const TSharedRef& input = TSharedRef::MakeEmpty()); + TSubprocessResult Execute( + const TSharedRef& input = TSharedRef::MakeEmpty(), + TDuration timeout = TDuration::Max()); + void Kill(int signal); TString GetCommandLine() const; @@ -36,6 +39,8 @@ public: TProcessBasePtr GetProcess() const; private: + const TString Path_; + const TProcessBasePtr Process_; }; |
