diff options
author | kerzum <[email protected]> | 2022-02-10 16:49:33 +0300 |
---|---|---|
committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:49:33 +0300 |
commit | 47a7e7b29636bfb2deb1df5f92363b3c75229c95 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /util/system/shellcommand.cpp | |
parent | 9a7232babfd763ccfe827bc70e82e0f50cfd8276 (diff) |
Restoring authorship annotation for <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'util/system/shellcommand.cpp')
-rw-r--r-- | util/system/shellcommand.cpp | 552 |
1 files changed, 276 insertions, 276 deletions
diff --git a/util/system/shellcommand.cpp b/util/system/shellcommand.cpp index 3cc23600cfa..b1989b5c8c3 100644 --- a/util/system/shellcommand.cpp +++ b/util/system/shellcommand.cpp @@ -4,7 +4,7 @@ #include "sigset.h" #include "atomic.h" -#include <util/folder/dirut.h> +#include <util/folder/dirut.h> #include <util/generic/algorithm.h> #include <util/generic/buffer.h> #include <util/generic/vector.h> @@ -18,12 +18,12 @@ #include <errno.h> -#if defined(_unix_) +#if defined(_unix_) #include <unistd.h> #include <fcntl.h> #include <grp.h> #include <sys/wait.h> - + using TPid = pid_t; using TWaitResult = pid_t; using TExitStatus = int; @@ -34,25 +34,25 @@ using TGetGroupListGid = int; #else using TGetGroupListGid = gid_t; #endif -#elif defined(_win_) +#elif defined(_win_) #include <string> #include "winint.h" - + using TPid = HANDLE; using TWaitResult = DWORD; using TExitStatus = DWORD; #define WAIT_PROCEED WAIT_TIMEOUT #pragma warning(disable : 4296) // 'wait_result >= WAIT_OBJECT_0' : expression is always tru -#else +#else #error("unknown os, shell command is not implemented") -#endif - +#endif + #define DBG(stmt) \ {} -// #define DBG(stmt) stmt - +// #define DBG(stmt) stmt + namespace { constexpr static size_t DATA_BUFFER_SIZE = 128 * 1024; @@ -107,89 +107,89 @@ namespace { #endif } -// temporary measure to avoid rewriting all poll calls on win TPipeHandle -#if defined(_win_) +// temporary measure to avoid rewriting all poll calls on win TPipeHandle +#if defined(_win_) using REALPIPEHANDLE = HANDLE; #define INVALID_REALPIPEHANDLE INVALID_HANDLE_VALUE - -class TRealPipeHandle + +class TRealPipeHandle : public TNonCopyable { -public: +public: inline TRealPipeHandle() noexcept - : Fd_(INVALID_REALPIPEHANDLE) - { - } - + : Fd_(INVALID_REALPIPEHANDLE) + { + } + inline TRealPipeHandle(REALPIPEHANDLE fd) noexcept - : Fd_(fd) - { - } - + : Fd_(fd) + { + } + inline ~TRealPipeHandle() { - Close(); - } - + Close(); + } + bool Close() noexcept { - bool ok = true; - if (Fd_ != INVALID_REALPIPEHANDLE) - ok = CloseHandle(Fd_); - Fd_ = INVALID_REALPIPEHANDLE; - return ok; - } - + bool ok = true; + if (Fd_ != INVALID_REALPIPEHANDLE) + ok = CloseHandle(Fd_); + Fd_ = INVALID_REALPIPEHANDLE; + return ok; + } + inline REALPIPEHANDLE Release() noexcept { - REALPIPEHANDLE ret = Fd_; - Fd_ = INVALID_REALPIPEHANDLE; - return ret; - } - + REALPIPEHANDLE ret = Fd_; + Fd_ = INVALID_REALPIPEHANDLE; + return ret; + } + inline void Swap(TRealPipeHandle& r) noexcept { - DoSwap(Fd_, r.Fd_); - } - + DoSwap(Fd_, r.Fd_); + } + inline operator REALPIPEHANDLE() const noexcept { - return Fd_; - } - + return Fd_; + } + inline bool IsOpen() const noexcept { - return Fd_ != INVALID_REALPIPEHANDLE; - } - + return Fd_ != INVALID_REALPIPEHANDLE; + } + ssize_t Read(void* buffer, size_t byteCount) const noexcept { - DWORD doneBytes; + DWORD doneBytes; if (!ReadFile(Fd_, buffer, byteCount, &doneBytes, nullptr)) - return -1; - return doneBytes; - } + return -1; + return doneBytes; + } ssize_t Write(const void* buffer, size_t byteCount) const noexcept { - DWORD doneBytes; + DWORD doneBytes; if (!WriteFile(Fd_, buffer, byteCount, &doneBytes, nullptr)) - return -1; - return doneBytes; - } - + return -1; + return doneBytes; + } + static void Pipe(TRealPipeHandle& reader, TRealPipeHandle& writer, EOpenMode mode) { (void)mode; - REALPIPEHANDLE fds[2]; + REALPIPEHANDLE fds[2]; if (!CreatePipe(&fds[0], &fds[1], nullptr /* handles are not inherited */, 0)) - ythrow TFileError() << "failed to create a pipe"; - TRealPipeHandle(fds[0]).Swap(reader); - TRealPipeHandle(fds[1]).Swap(writer); - } - + ythrow TFileError() << "failed to create a pipe"; + TRealPipeHandle(fds[0]).Swap(reader); + TRealPipeHandle(fds[1]).Swap(writer); + } + private: - REALPIPEHANDLE Fd_; -}; - -#else + REALPIPEHANDLE Fd_; +}; + +#else using TRealPipeHandle = TPipeHandle; using REALPIPEHANDLE = PIPEHANDLE; #define INVALID_REALPIPEHANDLE INVALID_PIPEHANDLE -#endif - -class TShellCommand::TImpl +#endif + +class TShellCommand::TImpl : public TAtomicRefCount<TShellCommand::TImpl> { -private: +private: TPid Pid; TString Command; TList<TString> Arguments; @@ -208,7 +208,7 @@ private: TFileHandle OutputHandle; TFileHandle ErrorHandle; - /// @todo: store const TShellCommandOptions, no need for so many vars + /// @todo: store const TShellCommandOptions, no need for so many vars bool TerminateFlag = false; bool ClearSignalMask = false; bool CloseAllFdsOnExec = false; @@ -230,9 +230,9 @@ private: struct TProcessInfo { TImpl* Parent; - TRealPipeHandle InputFd; - TRealPipeHandle OutputFd; - TRealPipeHandle ErrorFd; + TRealPipeHandle InputFd; + TRealPipeHandle OutputFd; + TRealPipeHandle ErrorFd; TProcessInfo(TImpl* parent, REALPIPEHANDLE inputFd, REALPIPEHANDLE outputFd, REALPIPEHANDLE errorFd) : Parent(parent) , InputFd(inputFd) @@ -242,12 +242,12 @@ private: } }; - struct TPipes { - TRealPipeHandle OutputPipeFd[2]; - TRealPipeHandle ErrorPipeFd[2]; - TRealPipeHandle InputPipeFd[2]; - // pipes are closed by automatic dtor - void PrepareParents() { + struct TPipes { + TRealPipeHandle OutputPipeFd[2]; + TRealPipeHandle ErrorPipeFd[2]; + TRealPipeHandle InputPipeFd[2]; + // pipes are closed by automatic dtor + void PrepareParents() { if (OutputPipeFd[1].IsOpen()) { OutputPipeFd[1].Close(); } @@ -255,16 +255,16 @@ private: ErrorPipeFd[1].Close(); } if (InputPipeFd[1].IsOpen()) { - InputPipeFd[0].Close(); + InputPipeFd[0].Close(); } - } - void ReleaseParents() { - InputPipeFd[1].Release(); - OutputPipeFd[0].Release(); - ErrorPipeFd[0].Release(); - } - }; - + } + void ReleaseParents() { + InputPipeFd[1].Release(); + OutputPipeFd[0].Release(); + ErrorPipeFd[0].Release(); + } + }; + struct TPipePump { TRealPipeHandle* Pipe; IOutputStream* OutputStream; @@ -273,12 +273,12 @@ private: TString InternalError; }; -#if defined(_unix_) +#if defined(_unix_) void OnFork(TPipes& pipes, sigset_t oldmask, char* const* argv, char* const* envp, const std::function<void()>& afterFork) const; -#else - void StartProcess(TPipes& pipes); -#endif - +#else + void StartProcess(TPipes& pipes); +#endif + public: inline TImpl(const TStringBuf cmd, const TList<TString>& args, const TShellCommandOptions& options, const TString& workdir) : Pid(0) @@ -298,7 +298,7 @@ public: , UseShell(options.UseShell) , QuoteArguments(options.QuoteArguments) , DetachSession(options.DetachSession) - , CloseStreams(options.CloseStreams) + , CloseStreams(options.CloseStreams) , ShouldCloseInput(options.ShouldCloseInput) , InputMode(options.InputMode) , OutputMode(options.OutputMode) @@ -386,24 +386,24 @@ public: return ErrorHandle; } - // start child process - void Run(); + // start child process + void Run(); inline void Terminate() { if (!!Pid && (AtomicGet(ExecutionStatus) == SHELL_RUNNING)) { - bool ok = -#if defined(_unix_) + bool ok = +#if defined(_unix_) kill(DetachSession ? -1 * Pid : Pid, SIGTERM) == 0; if (!ok && (errno == ESRCH) && DetachSession) { // this could fail when called before child proc completes setsid(). ok = kill(Pid, SIGTERM) == 0; kill(-Pid, SIGTERM); // between a failed kill(-Pid) and a successful kill(Pid) a grandchild could have been spawned } -#else +#else TerminateProcess(Pid, 1 /* exit code */); -#endif +#endif if (!ok) { - ythrow TSystemError() << "cannot terminate " << Pid; + ythrow TSystemError() << "cannot terminate " << Pid; } } } @@ -426,28 +426,28 @@ public: pi->InputFd.Close(); pi->ErrorFd.Close(); pi->OutputFd.Close(); - - if (pi->Parent->CloseStreams) { + + if (pi->Parent->CloseStreams) { if (pi->Parent->ErrorStream) { - pi->Parent->ErrorStream->Finish(); + pi->Parent->ErrorStream->Finish(); } if (pi->Parent->OutputStream) { - pi->Parent->OutputStream->Finish(); + pi->Parent->OutputStream->Finish(); } - } - + } + delete pi; return true; } - // interchange io while process is alive + // interchange io while process is alive inline static void Communicate(TProcessInfo* pi); inline static void* WatchProcess(void* data) { TProcessInfo* pi = reinterpret_cast<TProcessInfo*>(data); - Communicate(pi); + Communicate(pi); return nullptr; - } + } inline static void* ReadStream(void* data) noexcept { TPipePump* pump = reinterpret_cast<TPipePump*>(data); @@ -510,16 +510,16 @@ public: } TString GetQuotedCommand() const; -}; +}; -#if defined(_win_) -void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { - // Setup STARTUPINFO to redirect handles. +#if defined(_win_) +void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { + // Setup STARTUPINFO to redirect handles. STARTUPINFOW startup_info; - ZeroMemory(&startup_info, sizeof(startup_info)); - startup_info.cb = sizeof(startup_info); - startup_info.dwFlags = STARTF_USESTDHANDLES; - + ZeroMemory(&startup_info, sizeof(startup_info)); + startup_info.cb = sizeof(startup_info); + startup_info.dwFlags = STARTF_USESTDHANDLES; + if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) { if (!SetHandleInformation(pipes.OutputPipeFd[1], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) { ythrow TSystemError() << "cannot set handle info"; @@ -531,11 +531,11 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { } } if (InputMode != TShellCommandOptions::HANDLE_INHERIT) { - if (!SetHandleInformation(pipes.InputPipeFd[0], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) - ythrow TSystemError() << "cannot set handle info"; + if (!SetHandleInformation(pipes.InputPipeFd[0], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) + ythrow TSystemError() << "cannot set handle info"; } - - // A sockets do not work as std streams for some reason + + // A sockets do not work as std streams for some reason if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) { startup_info.hStdOutput = pipes.OutputPipeFd[1]; } else { @@ -547,20 +547,20 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { startup_info.hStdError = GetStdHandle(STD_ERROR_HANDLE); } if (InputMode != TShellCommandOptions::HANDLE_INHERIT) { - startup_info.hStdInput = pipes.InputPipeFd[0]; + startup_info.hStdInput = pipes.InputPipeFd[0]; } else { // Don't leave hStdInput unfilled, otherwise any attempt to retrieve the operating-system file handle // that is associated with the specified file descriptor will led to errors. startup_info.hStdInput = GetStdHandle(STD_INPUT_HANDLE); } - - PROCESS_INFORMATION process_info; + + PROCESS_INFORMATION process_info; // TString cmd = "cmd /U" + TUtf16String can be used to read unicode messages from cmd - // /A - ansi charset /Q - echo off, /C - command, /Q - special quotes + // /A - ansi charset /Q - echo off, /C - command, /Q - special quotes TString qcmd = GetQuotedCommand(); TString cmd = UseShell ? "cmd /A /Q /S /C \"" + qcmd + "\"" : qcmd; - // winapi can modify command text, copy it - + // winapi can modify command text, copy it + Y_ENSURE_EX(cmd.size() < MAX_COMMAND_LINE, yexception() << "Command is too long (length=" << cmd.size() << ")"); TTempArray<wchar_t> cmdcopy(MAX_COMMAND_LINE); Copy(cmd.data(), cmd.data() + cmd.size(), cmdcopy.Data()); @@ -617,16 +617,16 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { if (!res) { AtomicSet(ExecutionStatus, SHELL_ERROR); - /// @todo: write to error stream if set + /// @todo: write to error stream if set TStringOutput out(CollectedError); out << "Process was not created: " << LastSystemErrorText() << " command text was: '" << GetAString(cmdcopy.Data()) << "'"; - } + } Pid = process_info.hProcess; CloseHandle(process_info.hThread); DBG(Cerr << "created process id " << Pid << " in dir: " << cwd << ", cmd: " << cmdcopy.Data() << Endl); -} -#endif - +} +#endif + void ShellQuoteArg(TString& dst, TStringBuf argument) { dst.append("\""); TStringBuf l, r; @@ -657,14 +657,14 @@ TString TShellCommand::TImpl::GetQuotedCommand() const { // Don't add unnecessary quotes. It's especially important for the windows with a 32k command line length limit. if (QuoteArguments && ArgNeedsQuotes(argument)) { ::ShellQuoteArgSp(quoted, argument); - } else { + } else { quoted.append(" ").append(argument); - } - } - return quoted; -} - -#if defined(_unix_) + } + } + return quoted; +} + +#if defined(_unix_) void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* argv, char* const* envp, const std::function<void()>& afterFork) const { try { if (DetachSession) { @@ -715,7 +715,7 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* sErr.Release(); sErrNew.Release(); } - + if (WorkDir.size()) { NFs::SetCurrentWorkingDirectory(WorkDir); } @@ -724,8 +724,8 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* for (int fd = NSystemInfo::MaxOpenFiles(); fd > STDERR_FILENO; --fd) { fcntl(fd, F_SETFD, FD_CLOEXEC); } - } - + } + if (!User.Name.empty()) { ImpersonateUser(User); } @@ -750,18 +750,18 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* Cerr << "Process was not created: " << "unknown error" << Endl; } - + _exit(-1); -} -#endif - -void TShellCommand::TImpl::Run() { +} +#endif + +void TShellCommand::TImpl::Run() { Y_ENSURE(AtomicGet(ExecutionStatus) != SHELL_RUNNING, TStringBuf("Process is already running")); - // Prepare I/O streams - CollectedOutput.clear(); - CollectedError.clear(); - TPipes pipes; - + // Prepare I/O streams + CollectedOutput.clear(); + CollectedError.clear(); + TPipes pipes; + if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) { TRealPipeHandle::Pipe(pipes.OutputPipeFd[0], pipes.OutputPipeFd[1], CloseOnExec); } @@ -770,11 +770,11 @@ void TShellCommand::TImpl::Run() { } if (InputMode != TShellCommandOptions::HANDLE_INHERIT) { TRealPipeHandle::Pipe(pipes.InputPipeFd[0], pipes.InputPipeFd[1], CloseOnExec); - } - + } + AtomicSet(ExecutionStatus, SHELL_RUNNING); - -#if defined(_unix_) + +#if defined(_unix_) // block all signals to avoid signal handler race after fork() sigset_t oldmask, newmask; SigFillSet(&newmask); @@ -817,12 +817,12 @@ void TShellCommand::TImpl::Run() { envp.push_back(nullptr); } - pid_t pid = fork(); - if (pid == -1) { + pid_t pid = fork(); + if (pid == -1) { AtomicSet(ExecutionStatus, SHELL_ERROR); - /// @todo check if pipes are still open + /// @todo check if pipes are still open ythrow TSystemError() << "Cannot fork"; - } else if (pid == 0) { // child + } else if (pid == 0) { // child if (envp.size() != 0) { OnFork(pipes, oldmask, qargv.data(), envp.data(), FuncAfterFork); } else { @@ -833,17 +833,17 @@ void TShellCommand::TImpl::Run() { if (SigProcMask(SIG_SETMASK, &oldmask, nullptr) != 0) { ythrow TSystemError() << "Cannot restore signal mask in parent"; } - } - Pid = pid; -#else - StartProcess(pipes); -#endif - pipes.PrepareParents(); - + } + Pid = pid; +#else + StartProcess(pipes); +#endif + pipes.PrepareParents(); + if (AtomicGet(ExecutionStatus) != SHELL_RUNNING) { - return; + return; } - + if (InputMode == TShellCommandOptions::HANDLE_PIPE) { TFileHandle inputHandle(pipes.InputPipeFd[1].Release()); InputHandle.Swap(inputHandle); @@ -861,32 +861,32 @@ void TShellCommand::TImpl::Run() { TProcessInfo* processInfo = new TProcessInfo(this, pipes.InputPipeFd[1].Release(), pipes.OutputPipeFd[0].Release(), pipes.ErrorPipeFd[0].Release()); - if (AsyncMode) { + if (AsyncMode) { WatchThread = new TThread(&TImpl::WatchProcess, processInfo); - WatchThread->Start(); - /// @todo wait for child to start its process session (if options.Detach) + WatchThread->Start(); + /// @todo wait for child to start its process session (if options.Detach) } else { - Communicate(processInfo); - } + Communicate(processInfo); + } + + pipes.ReleaseParents(); // not needed +} - pipes.ReleaseParents(); // not needed -} - -void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { +void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { THolder<IOutputStream> outputHolder; IOutputStream* output = pi->Parent->OutputStream; if (!output) { - outputHolder.Reset(output = new TStringOutput(pi->Parent->CollectedOutput)); + outputHolder.Reset(output = new TStringOutput(pi->Parent->CollectedOutput)); } - + THolder<IOutputStream> errorHolder; IOutputStream* error = pi->Parent->ErrorStream; if (!error) { - errorHolder.Reset(error = new TStringOutput(pi->Parent->CollectedError)); + errorHolder.Reset(error = new TStringOutput(pi->Parent->CollectedError)); } - + IInputStream*& input = pi->Parent->InputStream; - + #if defined(_unix_) // not really needed, io is done via poll if (pi->OutputFd.IsOpen()) { @@ -900,7 +900,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { } #endif - try { + try { #if defined(_win_) TPipePump pumps[3] = {0}; pumps[0] = {&pi->ErrorFd, error}; @@ -920,172 +920,172 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { #else TBuffer buffer(DATA_BUFFER_SIZE); TBuffer inputBuffer(DATA_BUFFER_SIZE); - int bytes; - int bytesToWrite = 0; + int bytes; + int bytesToWrite = 0; char* bufPos = nullptr; #endif - TWaitResult waitPidResult; + TWaitResult waitPidResult; TExitStatus status = 0; - - while (true) { - { + + while (true) { + { with_lock (pi->Parent->TerminateMutex) { if (TerminateIsRequired(pi)) { return; } } - waitPidResult = -#if defined(_unix_) + waitPidResult = +#if defined(_unix_) waitpid(pi->Parent->Pid, &status, WNOHANG); -#else +#else WaitForSingleObject(pi->Parent->Pid /* process_info.hProcess */, pi->Parent->PollDelayMs /* ms */); Y_UNUSED(status); -#endif - // DBG(Cerr << "wait result: " << waitPidResult << Endl); +#endif + // DBG(Cerr << "wait result: " << waitPidResult << Endl); if (waitPidResult != WAIT_PROCEED) { - break; + break; } - } + } /// @todo factor out (poll + wfmo) #if defined(_unix_) bool haveIn = false; bool haveOut = false; bool haveErr = false; - if (!input && pi->InputFd.IsOpen()) { - DBG(Cerr << "closing input stream..." << Endl); - pi->InputFd.Close(); - } - if (!output && pi->OutputFd.IsOpen()) { - DBG(Cerr << "closing output stream..." << Endl); - pi->OutputFd.Close(); - } - if (!error && pi->ErrorFd.IsOpen()) { - DBG(Cerr << "closing error stream..." << Endl); - pi->ErrorFd.Close(); - } - + if (!input && pi->InputFd.IsOpen()) { + DBG(Cerr << "closing input stream..." << Endl); + pi->InputFd.Close(); + } + if (!output && pi->OutputFd.IsOpen()) { + DBG(Cerr << "closing output stream..." << Endl); + pi->OutputFd.Close(); + } + if (!error && pi->ErrorFd.IsOpen()) { + DBG(Cerr << "closing error stream..." << Endl); + pi->ErrorFd.Close(); + } + if (!input && !output && !error) { - continue; + continue; } - + struct pollfd fds[] = { {REALPIPEHANDLE(pi->InputFd), POLLOUT, 0}, {REALPIPEHANDLE(pi->OutputFd), POLLIN, 0}, {REALPIPEHANDLE(pi->ErrorFd), POLLIN, 0}}; - int res; - + int res; + if (!input) { - fds[0].events = 0; + fds[0].events = 0; } if (!output) { - fds[1].events = 0; + fds[1].events = 0; } if (!error) { - fds[2].events = 0; + fds[2].events = 0; } - - res = PollD(fds, 3, TInstant::Now() + TDuration::MilliSeconds(pi->Parent->PollDelayMs)); - // DBG(Cerr << "poll result: " << res << Endl); + + res = PollD(fds, 3, TInstant::Now() + TDuration::MilliSeconds(pi->Parent->PollDelayMs)); + // DBG(Cerr << "poll result: " << res << Endl); if (-res == ETIMEDOUT || res == 0) { - // DBG(Cerr << "poll again..." << Endl); - continue; - } + // DBG(Cerr << "poll again..." << Endl); + continue; + } if (res < 0) { - ythrow yexception() << "poll failed: " << LastSystemErrorText(); + ythrow yexception() << "poll failed: " << LastSystemErrorText(); } - + if ((fds[1].revents & POLLIN) == POLLIN) { - haveOut = true; + haveOut = true; } else if (fds[1].revents & (POLLERR | POLLHUP)) { output = nullptr; } - + if ((fds[2].revents & POLLIN) == POLLIN) { - haveErr = true; + haveErr = true; } else if (fds[2].revents & (POLLERR | POLLHUP)) { error = nullptr; } - + if (input && ((fds[0].revents & POLLOUT) == POLLOUT)) { - haveIn = true; + haveIn = true; } - if (haveOut) { - bytes = pi->OutputFd.Read(buffer.Data(), buffer.Capacity()); - DBG(Cerr << "transferred " << bytes << " bytes of output" << Endl); + if (haveOut) { + bytes = pi->OutputFd.Read(buffer.Data(), buffer.Capacity()); + DBG(Cerr << "transferred " << bytes << " bytes of output" << Endl); if (bytes > 0) { output->Write(buffer.Data(), bytes); } else { output = nullptr; } - } - if (haveErr) { - bytes = pi->ErrorFd.Read(buffer.Data(), buffer.Capacity()); - DBG(Cerr << "transferred " << bytes << " bytes of error" << Endl); + } + if (haveErr) { + bytes = pi->ErrorFd.Read(buffer.Data(), buffer.Capacity()); + DBG(Cerr << "transferred " << bytes << " bytes of error" << Endl); if (bytes > 0) { error->Write(buffer.Data(), bytes); } else { error = nullptr; } - } - - if (haveIn) { - if (!bytesToWrite) { - bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity()); - if (bytesToWrite == 0) { + } + + if (haveIn) { + if (!bytesToWrite) { + bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity()); + if (bytesToWrite == 0) { if (AtomicGet(pi->Parent->ShouldCloseInput)) { input = nullptr; } - continue; + continue; } - bufPos = inputBuffer.Data(); + bufPos = inputBuffer.Data(); } - + bytes = pi->InputFd.Write(bufPos, bytesToWrite); - if (bytes > 0) { - bytesToWrite -= bytes; - bufPos += bytes; + if (bytes > 0) { + bytesToWrite -= bytes; + bufPos += bytes; } else { input = nullptr; } - - DBG(Cerr << "transferred " << bytes << " bytes of input" << Endl); + + DBG(Cerr << "transferred " << bytes << " bytes of input" << Endl); } #endif - } - DBG(Cerr << "process finished" << Endl); - + } + DBG(Cerr << "process finished" << Endl); + // What's the reason of process exit. // We need to set exit code before waiting for input thread // Otherwise there is no way for input stream provider to discover // that process has exited and stream shouldn't wait for new data. - bool cleanExit = false; + bool cleanExit = false; TMaybe<int> processExitCode; -#if defined(_unix_) +#if defined(_unix_) processExitCode = WEXITSTATUS(status); if (WIFEXITED(status) && processExitCode == 0) { - cleanExit = true; + cleanExit = true; } else if (WIFSIGNALED(status)) { processExitCode = -WTERMSIG(status); } -#else - if (waitPidResult == WAIT_OBJECT_0) { - DWORD exitCode = STILL_ACTIVE; +#else + if (waitPidResult == WAIT_OBJECT_0) { + DWORD exitCode = STILL_ACTIVE; if (!GetExitCodeProcess(pi->Parent->Pid, &exitCode)) { - ythrow yexception() << "GetExitCodeProcess: " << LastSystemErrorText(); + ythrow yexception() << "GetExitCodeProcess: " << LastSystemErrorText(); } - if (exitCode == 0) - cleanExit = true; + if (exitCode == 0) + cleanExit = true; processExitCode = static_cast<int>(exitCode); - DBG(Cerr << "exit code: " << exitCode << Endl); + DBG(Cerr << "exit code: " << exitCode << Endl); } -#endif +#endif pi->Parent->ExitCode = processExitCode; - if (cleanExit) { + if (cleanExit) { AtomicSet(pi->Parent->ExecutionStatus, SHELL_FINISHED); - } else { + } else { AtomicSet(pi->Parent->ExecutionStatus, SHELL_ERROR); } @@ -1108,18 +1108,18 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { } #endif } catch (const yexception& e) { - // Some error in watch occured, set result to error + // Some error in watch occured, set result to error AtomicSet(pi->Parent->ExecutionStatus, SHELL_INTERNAL_ERROR); - pi->Parent->InternalError = e.what(); + pi->Parent->InternalError = e.what(); if (input) { - pi->InputFd.Close(); + pi->InputFd.Close(); } - Cdbg << "shell command internal error: " << pi->Parent->InternalError << Endl; + Cdbg << "shell command internal error: " << pi->Parent->InternalError << Endl; } - // Now we can safely delete process info struct and other data - pi->Parent->TerminateFlag = true; + // Now we can safely delete process info struct and other data + pi->Parent->TerminateFlag = true; TerminateIsRequired(pi); -} +} TShellCommand::TShellCommand(const TStringBuf cmd, const TList<TString>& args, const TShellCommandOptions& options, const TString& workdir) |