aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgleb-kov <gleb-kov@yandex-team.ru>2022-02-17 15:58:20 +0300
committergleb-kov <gleb-kov@yandex-team.ru>2022-02-17 15:58:20 +0300
commitb3e69438ecdcb34935c6731682f072368e7eafda (patch)
tree3a6c3bb7e565be576eb00a506cc47af8fe49d4f1
parent329e7bdd49970bba890ca97109ef98486b41a7cd (diff)
downloadydb-b3e69438ecdcb34935c6731682f072368e7eafda.tar.gz
TShellCommand::TImpl via TShellCommandOptions
ref:f149c0bc76521a1273f6a2c155f2bd116141b1eb
-rw-r--r--util/system/shellcommand.cpp153
-rw-r--r--util/system/shellcommand.h8
2 files changed, 65 insertions, 96 deletions
diff --git a/util/system/shellcommand.cpp b/util/system/shellcommand.cpp
index b1989b5c8c..247bf6f340 100644
--- a/util/system/shellcommand.cpp
+++ b/util/system/shellcommand.cpp
@@ -190,44 +190,28 @@ using REALPIPEHANDLE = PIPEHANDLE;
class TShellCommand::TImpl
: public TAtomicRefCount<TShellCommand::TImpl> {
private:
- TPid Pid;
TString Command;
TList<TString> Arguments;
+ TShellCommandOptions Options_;
TString WorkDir;
+
+ TShellCommandOptions::EHandleMode InputMode = TShellCommandOptions::HANDLE_STREAM;
+
+ TPid Pid;
TAtomic ExecutionStatus; // TShellCommand::ECommandStatus
+ TThread* WatchThread;
+ bool TerminateFlag = false;
+
TMaybe<int> ExitCode;
- IInputStream* InputStream;
- IOutputStream* OutputStream;
- IOutputStream* ErrorStream;
TString CollectedOutput;
TString CollectedError;
TString InternalError;
- TThread* WatchThread;
TMutex TerminateMutex;
TFileHandle InputHandle;
TFileHandle OutputHandle;
TFileHandle ErrorHandle;
- /// @todo: store const TShellCommandOptions, no need for so many vars
- bool TerminateFlag = false;
- bool ClearSignalMask = false;
- bool CloseAllFdsOnExec = false;
- bool AsyncMode = false;
- size_t PollDelayMs = 0;
- bool UseShell = false;
- bool QuoteArguments = false;
- bool DetachSession = false;
- bool CloseStreams = false;
- TAtomic ShouldCloseInput;
- TShellCommandOptions::EHandleMode InputMode = TShellCommandOptions::HANDLE_STREAM;
- TShellCommandOptions::EHandleMode OutputMode = TShellCommandOptions::HANDLE_STREAM;
- TShellCommandOptions::EHandleMode ErrorMode = TShellCommandOptions::HANDLE_STREAM;
-
- TShellCommandOptions::TUserOptions User;
- THashMap<TString, TString> Environment;
- int Nice = 0;
- std::function<void()> FuncAfterFork = {};
-
+private:
struct TProcessInfo {
TImpl* Parent;
TRealPipeHandle InputFd;
@@ -281,34 +265,17 @@ private:
public:
inline TImpl(const TStringBuf cmd, const TList<TString>& args, const TShellCommandOptions& options, const TString& workdir)
- : Pid(0)
- , Command(ToString(cmd))
+ : Command(ToString(cmd))
, Arguments(args)
+ , Options_(options)
, WorkDir(workdir)
+ , InputMode(options.InputMode)
+ , Pid(0)
, ExecutionStatus(SHELL_NONE)
- , InputStream(options.InputStream)
- , OutputStream(options.OutputStream)
- , ErrorStream(options.ErrorStream)
, WatchThread(nullptr)
, TerminateFlag(false)
- , ClearSignalMask(options.ClearSignalMask)
- , CloseAllFdsOnExec(options.CloseAllFdsOnExec)
- , AsyncMode(options.AsyncMode)
- , PollDelayMs(options.PollDelayMs)
- , UseShell(options.UseShell)
- , QuoteArguments(options.QuoteArguments)
- , DetachSession(options.DetachSession)
- , CloseStreams(options.CloseStreams)
- , ShouldCloseInput(options.ShouldCloseInput)
- , InputMode(options.InputMode)
- , OutputMode(options.OutputMode)
- , ErrorMode(options.ErrorMode)
- , User(options.User)
- , Environment(options.Environment)
- , Nice(options.Nice)
- , FuncAfterFork(options.FuncAfterFork)
{
- if (InputStream) {
+ if (Options_.InputStream) {
// TODO change usages to call SetInputStream instead of directly assigning to InputStream
InputMode = TShellCommandOptions::HANDLE_STREAM;
}
@@ -393,8 +360,8 @@ public:
if (!!Pid && (AtomicGet(ExecutionStatus) == SHELL_RUNNING)) {
bool ok =
#if defined(_unix_)
- kill(DetachSession ? -1 * Pid : Pid, SIGTERM) == 0;
- if (!ok && (errno == ESRCH) && DetachSession) {
+ kill(Options_.DetachSession ? -1 * Pid : Pid, SIGTERM) == 0;
+ if (!ok && (errno == ESRCH) && Options_.DetachSession) {
// this could fail when called before child proc completes setsid().
ok = kill(Pid, SIGTERM) == 0;
kill(-Pid, SIGTERM); // between a failed kill(-Pid) and a successful kill(Pid) a grandchild could have been spawned
@@ -415,7 +382,7 @@ public:
}
inline void CloseInput() {
- AtomicSet(ShouldCloseInput, true);
+ AtomicSet(Options_.ShouldCloseInput, true);
}
inline static bool TerminateIsRequired(void* processInfo) {
@@ -427,12 +394,12 @@ public:
pi->ErrorFd.Close();
pi->OutputFd.Close();
- if (pi->Parent->CloseStreams) {
- if (pi->Parent->ErrorStream) {
- pi->Parent->ErrorStream->Finish();
+ if (pi->Parent->Options_.CloseStreams) {
+ if (pi->Parent->Options_.ErrorStream) {
+ pi->Parent->Options_.ErrorStream->Finish();
}
- if (pi->Parent->OutputStream) {
- pi->Parent->OutputStream->Finish();
+ if (pi->Parent->Options_.OutputStream) {
+ pi->Parent->Options_.OutputStream->Finish();
}
}
@@ -520,12 +487,12 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
startup_info.cb = sizeof(startup_info);
startup_info.dwFlags = STARTF_USESTDHANDLES;
- if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
if (!SetHandleInformation(pipes.OutputPipeFd[1], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) {
ythrow TSystemError() << "cannot set handle info";
}
}
- if (ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
if (!SetHandleInformation(pipes.ErrorPipeFd[1], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) {
ythrow TSystemError() << "cannot set handle info";
}
@@ -536,12 +503,12 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
}
// A sockets do not work as std streams for some reason
- if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
startup_info.hStdOutput = pipes.OutputPipeFd[1];
} else {
startup_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
}
- if (ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
startup_info.hStdError = pipes.ErrorPipeFd[1];
} else {
startup_info.hStdError = GetStdHandle(STD_ERROR_HANDLE);
@@ -558,7 +525,7 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
// TString cmd = "cmd /U" + TUtf16String can be used to read unicode messages from cmd
// /A - ansi charset /Q - echo off, /C - command, /Q - special quotes
TString qcmd = GetQuotedCommand();
- TString cmd = UseShell ? "cmd /A /Q /S /C \"" + qcmd + "\"" : qcmd;
+ TString cmd = Options_.UseShell ? "cmd /A /Q /S /C \"" + qcmd + "\"" : qcmd;
// winapi can modify command text, copy it
Y_ENSURE_EX(cmd.size() < MAX_COMMAND_LINE, yexception() << "Command is too long (length=" << cmd.size() << ")");
@@ -575,8 +542,8 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
void* lpEnvironment = nullptr;
TString env;
- if (!Environment.empty()) {
- for (auto e = Environment.begin(); e != Environment.end(); ++e) {
+ if (!Options_.Environment.empty()) {
+ for (auto e = Options_.Environment.begin(); e != Options_.Environment.end(); ++e) {
env += e->first + '=' + e->second + '\0';
}
env += '\0';
@@ -588,7 +555,7 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
SetErrorMode(GetErrorMode() | SEM_NOGPFAULTERRORBOX);
#endif
BOOL res = 0;
- if (User.Name.empty() || GetUsername() == User.Name) {
+ if (Options_.User.Name.empty() || GetUsername() == Options_.User.Name) {
res = CreateProcessW(
nullptr, // image name
cmdcopy.Data(),
@@ -602,9 +569,9 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
&process_info);
} else {
res = CreateProcessWithLogonW(
- GetWString(User.Name.data()).c_str(),
+ GetWString(Options_.User.Name.data()).c_str(),
nullptr, // domain (if this parameter is NULL, the user name must be specified in UPN format)
- GetWString(User.Password.data()).c_str(),
+ GetWString(Options_.User.Password.data()).c_str(),
0, // logon flags
NULL, // image name
cmdcopy.Data(),
@@ -655,7 +622,7 @@ TString TShellCommand::TImpl::GetQuotedCommand() const {
TString quoted = Command; /// @todo command itself should be quoted too
for (const auto& argument : Arguments) {
// Don't add unnecessary quotes. It's especially important for the windows with a 32k command line length limit.
- if (QuoteArguments && ArgNeedsQuotes(argument)) {
+ if (Options_.QuoteArguments && ArgNeedsQuotes(argument)) {
::ShellQuoteArgSp(quoted, argument);
} else {
quoted.append(" ").append(argument);
@@ -667,7 +634,7 @@ TString TShellCommand::TImpl::GetQuotedCommand() const {
#if defined(_unix_)
void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* argv, char* const* envp, const std::function<void()>& afterFork) const {
try {
- if (DetachSession) {
+ if (Options_.DetachSession) {
setsid();
}
@@ -680,12 +647,12 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const*
// some signals cannot be caught, so just ignore return value
sigaction(i, &sa, nullptr);
}
- if (ClearSignalMask) {
+ if (Options_.ClearSignalMask) {
SigEmptySet(&oldmask);
}
// clear / restore signal mask
if (SigProcMask(SIG_SETMASK, &oldmask, nullptr) != 0) {
- ythrow TSystemError() << "Cannot " << (ClearSignalMask ? "clear" : "restore") << " signal mask in child";
+ ythrow TSystemError() << "Cannot " << (Options_.ClearSignalMask ? "clear" : "restore") << " signal mask in child";
}
TFileHandle sIn(0);
@@ -701,14 +668,14 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const*
// do not close fd 0 - next open will return it and confuse all readers
/// @todo in case of real need - reopen /dev/null
}
- if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
pipes.OutputPipeFd[0].Close();
TFileHandle sOutNew(pipes.OutputPipeFd[1]);
sOut.LinkTo(sOutNew);
sOut.Release();
sOutNew.Release();
}
- if (ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
pipes.ErrorPipeFd[0].Close();
TFileHandle sErrNew(pipes.ErrorPipeFd[1]);
sErr.LinkTo(sErrNew);
@@ -720,19 +687,19 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const*
NFs::SetCurrentWorkingDirectory(WorkDir);
}
- if (CloseAllFdsOnExec) {
+ if (Options_.CloseAllFdsOnExec) {
for (int fd = NSystemInfo::MaxOpenFiles(); fd > STDERR_FILENO; --fd) {
fcntl(fd, F_SETFD, FD_CLOEXEC);
}
}
- if (!User.Name.empty()) {
- ImpersonateUser(User);
+ if (!Options_.User.Name.empty()) {
+ ImpersonateUser(Options_.User);
}
- if (Nice) {
+ if (Options_.Nice) {
// Don't verify Nice() call - it does not work properly with WSL https://github.com/Microsoft/WSL/issues/1838
- ::Nice(Nice);
+ ::Nice(Options_.Nice);
}
if (afterFork) {
afterFork();
@@ -762,10 +729,10 @@ void TShellCommand::TImpl::Run() {
CollectedError.clear();
TPipes pipes;
- if (OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
TRealPipeHandle::Pipe(pipes.OutputPipeFd[0], pipes.OutputPipeFd[1], CloseOnExec);
}
- if (ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
+ if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
TRealPipeHandle::Pipe(pipes.ErrorPipeFd[0], pipes.ErrorPipeFd[1], CloseOnExec);
}
if (InputMode != TShellCommandOptions::HANDLE_INHERIT) {
@@ -789,7 +756,7 @@ void TShellCommand::TImpl::Run() {
Following "const_cast"s are safe:
http://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
*/
- if (UseShell) {
+ if (Options_.UseShell) {
shellArg = GetQuotedCommand();
qargv.reserve(4);
qargv.push_back(const_cast<char*>("/bin/sh"));
@@ -809,8 +776,8 @@ void TShellCommand::TImpl::Run() {
TVector<TString> envHolder;
TVector<char*> envp;
- if (!Environment.empty()) {
- for (auto& env : Environment) {
+ if (!Options_.Environment.empty()) {
+ for (auto& env : Options_.Environment) {
envHolder.emplace_back(env.first + '=' + env.second);
envp.push_back(const_cast<char*>(envHolder.back().data()));
}
@@ -824,9 +791,9 @@ void TShellCommand::TImpl::Run() {
ythrow TSystemError() << "Cannot fork";
} else if (pid == 0) { // child
if (envp.size() != 0) {
- OnFork(pipes, oldmask, qargv.data(), envp.data(), FuncAfterFork);
+ OnFork(pipes, oldmask, qargv.data(), envp.data(), Options_.FuncAfterFork);
} else {
- OnFork(pipes, oldmask, qargv.data(), nullptr, FuncAfterFork);
+ OnFork(pipes, oldmask, qargv.data(), nullptr, Options_.FuncAfterFork);
}
} else { // parent
// restore signal mask
@@ -849,19 +816,19 @@ void TShellCommand::TImpl::Run() {
InputHandle.Swap(inputHandle);
}
- if (OutputMode == TShellCommandOptions::HANDLE_PIPE) {
+ if (Options_.OutputMode == TShellCommandOptions::HANDLE_PIPE) {
TFileHandle outputHandle(pipes.OutputPipeFd[0].Release());
OutputHandle.Swap(outputHandle);
}
- if (ErrorMode == TShellCommandOptions::HANDLE_PIPE) {
+ if (Options_.ErrorMode == TShellCommandOptions::HANDLE_PIPE) {
TFileHandle errorHandle(pipes.ErrorPipeFd[0].Release());
ErrorHandle.Swap(errorHandle);
}
TProcessInfo* processInfo = new TProcessInfo(this,
pipes.InputPipeFd[1].Release(), pipes.OutputPipeFd[0].Release(), pipes.ErrorPipeFd[0].Release());
- if (AsyncMode) {
+ if (Options_.AsyncMode) {
WatchThread = new TThread(&TImpl::WatchProcess, processInfo);
WatchThread->Start();
/// @todo wait for child to start its process session (if options.Detach)
@@ -874,18 +841,18 @@ void TShellCommand::TImpl::Run() {
void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
THolder<IOutputStream> outputHolder;
- IOutputStream* output = pi->Parent->OutputStream;
+ IOutputStream* output = pi->Parent->Options_.OutputStream;
if (!output) {
outputHolder.Reset(output = new TStringOutput(pi->Parent->CollectedOutput));
}
THolder<IOutputStream> errorHolder;
- IOutputStream* error = pi->Parent->ErrorStream;
+ IOutputStream* error = pi->Parent->Options_.ErrorStream;
if (!error) {
errorHolder.Reset(error = new TStringOutput(pi->Parent->CollectedError));
}
- IInputStream*& input = pi->Parent->InputStream;
+ IInputStream*& input = pi->Parent->Options_.InputStream;
#if defined(_unix_)
// not really needed, io is done via poll
@@ -911,7 +878,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
streamThreads.emplace_back(new TThread(&TImpl::ReadStream, &pumps[1]));
if (input) {
- pumps[2] = {&pi->InputFd, nullptr, input, &pi->Parent->ShouldCloseInput};
+ pumps[2] = {&pi->InputFd, nullptr, input, &pi->Parent->Options_.ShouldCloseInput};
streamThreads.emplace_back(new TThread(&TImpl::WriteStream, &pumps[2]));
}
@@ -939,7 +906,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
#if defined(_unix_)
waitpid(pi->Parent->Pid, &status, WNOHANG);
#else
- WaitForSingleObject(pi->Parent->Pid /* process_info.hProcess */, pi->Parent->PollDelayMs /* ms */);
+ WaitForSingleObject(pi->Parent->Pid /* process_info.hProcess */, pi->Parent->Options_.PollDelayMs /* ms */);
Y_UNUSED(status);
#endif
// DBG(Cerr << "wait result: " << waitPidResult << Endl);
@@ -986,7 +953,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
fds[2].events = 0;
}
- res = PollD(fds, 3, TInstant::Now() + TDuration::MilliSeconds(pi->Parent->PollDelayMs));
+ res = PollD(fds, 3, TInstant::Now() + TDuration::MilliSeconds(pi->Parent->Options_.PollDelayMs));
// DBG(Cerr << "poll result: " << res << Endl);
if (-res == ETIMEDOUT || res == 0) {
// DBG(Cerr << "poll again..." << Endl);
@@ -1035,7 +1002,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
if (!bytesToWrite) {
bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity());
if (bytesToWrite == 0) {
- if (AtomicGet(pi->Parent->ShouldCloseInput)) {
+ if (AtomicGet(pi->Parent->Options_.ShouldCloseInput)) {
input = nullptr;
}
continue;
diff --git a/util/system/shellcommand.h b/util/system/shellcommand.h
index 8730627fe5..51b08e04b6 100644
--- a/util/system/shellcommand.h
+++ b/util/system/shellcommand.h
@@ -42,7 +42,7 @@ public:
: ClearSignalMask(false)
, CloseAllFdsOnExec(false)
, AsyncMode(false)
- , PollDelayMs(DefaultSyncPollDelay)
+ , PollDelayMs(DefaultSyncPollDelayMs)
, UseShell(true)
, QuoteArguments(true)
, DetachSession(true)
@@ -296,6 +296,9 @@ public:
}
public:
+ static constexpr size_t DefaultSyncPollDelayMs = 1000;
+
+public:
bool ClearSignalMask = false;
bool CloseAllFdsOnExec = false;
bool AsyncMode = false;
@@ -304,7 +307,7 @@ public:
bool QuoteArguments = false;
bool DetachSession = false;
bool CloseStreams = false;
- bool ShouldCloseInput = false;
+ TAtomic ShouldCloseInput = false;
EHandleMode InputMode = HANDLE_STREAM;
EHandleMode OutputMode = HANDLE_STREAM;
EHandleMode ErrorMode = HANDLE_STREAM;
@@ -321,7 +324,6 @@ public:
THashMap<TString, TString> Environment;
int Nice = 0;
- static const size_t DefaultSyncPollDelay = 1000; // ms
std::function<void()> FuncAfterFork = {};
};