#include "shellcommand.h"
#include "user.h"
#include "nice.h"
#include "sigset.h"

#include <util/folder/dirut.h>
#include <util/generic/algorithm.h>
#include <util/generic/buffer.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/memory/tempbuf.h>
#include <util/network/socket.h>
#include <util/stream/pipe.h>
#include <util/stream/str.h>
#include <util/string/cast.h>
#include <util/system/info.h>

#include <errno.h>

#if defined(_unix_)
    #include <unistd.h>
    #include <fcntl.h>
    #include <grp.h>
    #include <sys/wait.h>

using TPid = pid_t;
using TWaitResult = pid_t;
using TExitStatus = int;
    #define WAIT_PROCEED 0

    #if defined(_darwin_)
using TGetGroupListGid = int;
    #else
using TGetGroupListGid = gid_t;
    #endif
#elif defined(_win_)
    #include <string>

    #include "winint.h"

using TPid = HANDLE;
using TWaitResult = DWORD;
using TExitStatus = DWORD;
    #define WAIT_PROCEED WAIT_TIMEOUT

    #pragma warning(disable : 4296) // 'wait_result >= WAIT_OBJECT_0' : expression is always tru
#else
    #error("unknown os, shell command is not implemented")
#endif

#define DBG(stmt) \
    {}
// #define DBG(stmt) stmt

namespace {
    constexpr static size_t DATA_BUFFER_SIZE = 128 * 1024;

#if defined(_unix_)
    void SetUserGroups(const passwd* pw) {
        int ngroups = 1;
        THolder<gid_t, TFree> groups = THolder<gid_t, TFree>(static_cast<gid_t*>(malloc(ngroups * sizeof(gid_t))));
        if (getgrouplist(pw->pw_name, pw->pw_gid, reinterpret_cast<TGetGroupListGid*>(groups.Get()), &ngroups) == -1) {
            groups.Reset(static_cast<gid_t*>(malloc(ngroups * sizeof(gid_t))));
            if (getgrouplist(pw->pw_name, pw->pw_gid, reinterpret_cast<TGetGroupListGid*>(groups.Get()), &ngroups) == -1) {
                ythrow TSystemError() << "getgrouplist failed: user " << pw->pw_name << " (" << pw->pw_uid << ")";
            }
        }
        if (setgroups(ngroups, groups.Get()) == -1) {
            ythrow TSystemError(errno) << "Unable to set groups for user " << pw->pw_name << Endl;
        }
    }

    void ImpersonateUser(const TShellCommandOptions::TUserOptions& userOpts) {
        if (GetUsername() == userOpts.Name) {
            return;
        }
        const passwd* newUser = getpwnam(userOpts.Name.c_str());
        if (!newUser) {
            ythrow TSystemError(errno) << "getpwnam failed";
        }
        if (userOpts.UseUserGroups) {
            SetUserGroups(newUser);
        }
        if (setuid(newUser->pw_uid)) {
            ythrow TSystemError(errno) << "setuid failed";
        }
    }
#elif defined(_win_)
    constexpr static size_t MAX_COMMAND_LINE = 32 * 1024;

    std::wstring GetWString(const char* astring) {
        if (!astring)
            return std::wstring();

        std::string str(astring);
        return std::wstring(str.begin(), str.end());
    }

    std::string GetAString(const wchar_t* wstring) {
        if (!wstring)
            return std::string();

        std::wstring str(wstring);
        return std::string(str.begin(), str.end());
    }
#endif
}

// temporary measure to avoid rewriting all poll calls on win TPipeHandle
#if defined(_win_)
using REALPIPEHANDLE = HANDLE;
    #define INVALID_REALPIPEHANDLE INVALID_HANDLE_VALUE

class TRealPipeHandle
    : public TNonCopyable {
public:
    inline TRealPipeHandle() noexcept
        : Fd_(INVALID_REALPIPEHANDLE)
    {
    }

    inline TRealPipeHandle(REALPIPEHANDLE fd) noexcept
        : Fd_(fd)
    {
    }

    inline ~TRealPipeHandle() {
        Close();
    }

    bool Close() noexcept {
        bool ok = true;
        if (Fd_ != INVALID_REALPIPEHANDLE)
            ok = CloseHandle(Fd_);
        Fd_ = INVALID_REALPIPEHANDLE;
        return ok;
    }

    inline REALPIPEHANDLE Release() noexcept {
        REALPIPEHANDLE ret = Fd_;
        Fd_ = INVALID_REALPIPEHANDLE;
        return ret;
    }

    inline void Swap(TRealPipeHandle& r) noexcept {
        DoSwap(Fd_, r.Fd_);
    }

    inline operator REALPIPEHANDLE() const noexcept {
        return Fd_;
    }

    inline bool IsOpen() const noexcept {
        return Fd_ != INVALID_REALPIPEHANDLE;
    }

    ssize_t Read(void* buffer, size_t byteCount) const noexcept {
        DWORD doneBytes;
        if (!ReadFile(Fd_, buffer, byteCount, &doneBytes, nullptr))
            return -1;
        return doneBytes;
    }
    ssize_t Write(const void* buffer, size_t byteCount) const noexcept {
        DWORD doneBytes;
        if (!WriteFile(Fd_, buffer, byteCount, &doneBytes, nullptr))
            return -1;
        return doneBytes;
    }

    static void Pipe(TRealPipeHandle& reader, TRealPipeHandle& writer, EOpenMode mode) {
        (void)mode;
        REALPIPEHANDLE fds[2];
        if (!CreatePipe(&fds[0], &fds[1], nullptr /* handles are not inherited */, 0))
            ythrow TFileError() << "failed to create a pipe";
        TRealPipeHandle(fds[0]).Swap(reader);
        TRealPipeHandle(fds[1]).Swap(writer);
    }

private:
    REALPIPEHANDLE Fd_;
};

#else
using TRealPipeHandle = TPipeHandle;
using REALPIPEHANDLE = PIPEHANDLE;
    #define INVALID_REALPIPEHANDLE INVALID_PIPEHANDLE
#endif

class TShellCommand::TImpl
    : public TAtomicRefCount<TShellCommand::TImpl> {
private:
    TString Command;
    TList<TString> Arguments;
    TShellCommandOptions Options_;
    TString WorkDir;

    TShellCommandOptions::EHandleMode InputMode = TShellCommandOptions::HANDLE_STREAM;

    TPid Pid;
    std::atomic<size_t> ExecutionStatus; // TShellCommand::ECommandStatus
    TThread* WatchThread;
    bool TerminateFlag = false;

    TMaybe<int> ExitCode;
    TString CollectedOutput;
    TString CollectedError;
    TString InternalError;
    TMutex TerminateMutex;
    TFileHandle InputHandle;
    TFileHandle OutputHandle;
    TFileHandle ErrorHandle;

private:
    struct TProcessInfo {
        TImpl* Parent;
        TRealPipeHandle InputFd;
        TRealPipeHandle OutputFd;
        TRealPipeHandle ErrorFd;
        TProcessInfo(TImpl* parent, REALPIPEHANDLE inputFd, REALPIPEHANDLE outputFd, REALPIPEHANDLE errorFd)
            : Parent(parent)
            , InputFd(inputFd)
            , OutputFd(outputFd)
            , ErrorFd(errorFd)
        {
        }
    };

    struct TPipes {
        TRealPipeHandle OutputPipeFd[2];
        TRealPipeHandle ErrorPipeFd[2];
        TRealPipeHandle InputPipeFd[2];
        // pipes are closed by automatic dtor
        void PrepareParents() {
            if (OutputPipeFd[1].IsOpen()) {
                OutputPipeFd[1].Close();
            }
            if (ErrorPipeFd[1].IsOpen()) {
                ErrorPipeFd[1].Close();
            }
            if (InputPipeFd[1].IsOpen()) {
                InputPipeFd[0].Close();
            }
        }
        void ReleaseParents() {
            InputPipeFd[1].Release();
            OutputPipeFd[0].Release();
            ErrorPipeFd[0].Release();
        }
    };

    struct TPipePump {
        TRealPipeHandle* Pipe;
        IOutputStream* OutputStream;
        IInputStream* InputStream;
        std::atomic<bool>* ShouldClosePipe;
        TString InternalError;
    };

#if defined(_unix_)
    void OnFork(TPipes& pipes, sigset_t oldmask, char* const* argv, char* const* envp, const std::function<void()>& afterFork) const;
#else
    void StartProcess(TPipes& pipes);
#endif

public:
    inline TImpl(const TStringBuf cmd, const TList<TString>& args, const TShellCommandOptions& options, const TString& workdir)
        : Command(ToString(cmd))
        , Arguments(args)
        , Options_(options)
        , WorkDir(workdir)
        , InputMode(options.InputMode)
        , Pid(0)
        , ExecutionStatus(SHELL_NONE)
        , WatchThread(nullptr)
        , TerminateFlag(false)
    {
        if (Options_.InputStream) {
            // TODO change usages to call SetInputStream instead of directly assigning to InputStream
            InputMode = TShellCommandOptions::HANDLE_STREAM;
        }
    }

    inline ~TImpl() {
        if (WatchThread) {
            with_lock (TerminateMutex) {
                TerminateFlag = true;
            }

            delete WatchThread;
        }

#if defined(_win_)
        if (Pid) {
            CloseHandle(Pid);
        }
#endif
    }

    inline void AppendArgument(const TStringBuf argument) {
        if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
            ythrow yexception() << "You cannot change command parameters while process is running";
        }
        Arguments.push_back(ToString(argument));
    }

    inline const TString& GetOutput() const {
        if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
            ythrow yexception() << "You cannot retrieve output while process is running.";
        }
        return CollectedOutput;
    }

    inline const TString& GetError() const {
        if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
            ythrow yexception() << "You cannot retrieve output while process is running.";
        }
        return CollectedError;
    }

    inline const TString& GetInternalError() const {
        if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_INTERNAL_ERROR) {
            ythrow yexception() << "Internal error hasn't occured so can't be retrieved.";
        }
        return InternalError;
    }

    inline ECommandStatus GetStatus() const {
        return static_cast<ECommandStatus>(ExecutionStatus.load(std::memory_order_acquire));
    }

    inline TMaybe<int> GetExitCode() const {
        return ExitCode;
    }

    inline TProcessId GetPid() const {
#if defined(_win_)
        return GetProcessId(Pid);
#else
        return Pid;
#endif
    }

    inline TFileHandle& GetInputHandle() {
        return InputHandle;
    }

    inline TFileHandle& GetOutputHandle() {
        return OutputHandle;
    }

    inline TFileHandle& GetErrorHandle() {
        return ErrorHandle;
    }

    // start child process
    void Run();

    inline void Terminate() {
        if (!!Pid && (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING)) {
            bool ok =
#if defined(_unix_)
                kill(Options_.DetachSession ? -1 * Pid : Pid, SIGTERM) == 0;
            if (!ok && (errno == ESRCH) && Options_.DetachSession) {
                // this could fail when called before child proc completes setsid().
                ok = kill(Pid, SIGTERM) == 0;
                kill(-Pid, SIGTERM); // between a failed kill(-Pid) and a successful kill(Pid) a grandchild could have been spawned
            }
#else
                TerminateProcess(Pid, 1 /* exit code */);
#endif
            if (!ok) {
                ythrow TSystemError() << "cannot terminate " << Pid;
            }
        }
    }

    inline void Wait() {
        if (WatchThread) {
            WatchThread->Join();
        }
    }

    inline void CloseInput() {
        Options_.ShouldCloseInput.store(true);
    }

    inline static bool TerminateIsRequired(void* processInfo) {
        TProcessInfo* pi = reinterpret_cast<TProcessInfo*>(processInfo);
        if (!pi->Parent->TerminateFlag) {
            return false;
        }
        pi->InputFd.Close();
        pi->ErrorFd.Close();
        pi->OutputFd.Close();

        if (pi->Parent->Options_.CloseStreams) {
            if (pi->Parent->Options_.ErrorStream) {
                pi->Parent->Options_.ErrorStream->Finish();
            }
            if (pi->Parent->Options_.OutputStream) {
                pi->Parent->Options_.OutputStream->Finish();
            }
        }

        delete pi;
        return true;
    }

    // interchange io while process is alive
    inline static void Communicate(TProcessInfo* pi);

    inline static void* WatchProcess(void* data) {
        TProcessInfo* pi = reinterpret_cast<TProcessInfo*>(data);
        Communicate(pi);
        return nullptr;
    }

    inline static void* ReadStream(void* data) noexcept {
        TPipePump* pump = reinterpret_cast<TPipePump*>(data);
        try {
            int bytes = 0;
            TBuffer buffer(DATA_BUFFER_SIZE);

            while (true) {
                bytes = pump->Pipe->Read(buffer.Data(), buffer.Capacity());
                if (bytes > 0) {
                    pump->OutputStream->Write(buffer.Data(), bytes);
                } else {
                    break;
                }
            }
            if (pump->Pipe->IsOpen()) {
                pump->Pipe->Close();
            }
        } catch (...) {
            pump->InternalError = CurrentExceptionMessage();
        }
        return nullptr;
    }

    inline static void* WriteStream(void* data) noexcept {
        TPipePump* pump = reinterpret_cast<TPipePump*>(data);
        try {
            int bytes = 0;
            int bytesToWrite = 0;
            char* bufPos = nullptr;
            TBuffer buffer(DATA_BUFFER_SIZE);

            while (true) {
                if (!bytesToWrite) {
                    bytesToWrite = pump->InputStream->Read(buffer.Data(), buffer.Capacity());
                    if (bytesToWrite == 0) {
                        if (pump->ShouldClosePipe->load(std::memory_order_acquire)) {
                            break;
                        }
                        continue;
                    }
                    bufPos = buffer.Data();
                }

                bytes = pump->Pipe->Write(bufPos, bytesToWrite);
                if (bytes > 0) {
                    bytesToWrite -= bytes;
                    bufPos += bytes;
                } else {
                    break;
                }
            }
            if (pump->Pipe->IsOpen()) {
                pump->Pipe->Close();
            }
        } catch (...) {
            pump->InternalError = CurrentExceptionMessage();
        }
        return nullptr;
    }

    TString GetQuotedCommand() const;
};

#if defined(_win_)
void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
    // Setup STARTUPINFO to redirect handles.
    STARTUPINFOW startup_info;
    ZeroMemory(&startup_info, sizeof(startup_info));
    startup_info.cb = sizeof(startup_info);
    startup_info.dwFlags = STARTF_USESTDHANDLES;

    if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
        if (!SetHandleInformation(pipes.OutputPipeFd[1], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) {
            ythrow TSystemError() << "cannot set handle info";
        }
    }
    if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
        if (!SetHandleInformation(pipes.ErrorPipeFd[1], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT)) {
            ythrow TSystemError() << "cannot set handle info";
        }
    }
    if (InputMode != TShellCommandOptions::HANDLE_INHERIT) {
        if (!SetHandleInformation(pipes.InputPipeFd[0], HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT))
            ythrow TSystemError() << "cannot set handle info";
    }

    // A sockets do not work as std streams for some reason
    if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
        startup_info.hStdOutput = pipes.OutputPipeFd[1];
    } else {
        startup_info.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
    }
    if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
        startup_info.hStdError = pipes.ErrorPipeFd[1];
    } else {
        startup_info.hStdError = GetStdHandle(STD_ERROR_HANDLE);
    }
    if (InputMode != TShellCommandOptions::HANDLE_INHERIT) {
        startup_info.hStdInput = pipes.InputPipeFd[0];
    } else {
        // Don't leave hStdInput unfilled, otherwise any attempt to retrieve the operating-system file handle
        // that is associated with the specified file descriptor will led to errors.
        startup_info.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
    }

    PROCESS_INFORMATION process_info;
    // TString cmd = "cmd /U" + TUtf16String can be used to read unicode messages from cmd
    // /A - ansi charset /Q - echo off, /C - command, /Q - special quotes
    TString qcmd = GetQuotedCommand();
    TString cmd = Options_.UseShell ? "cmd /A /Q /S /C \"" + qcmd + "\"" : qcmd;
    // winapi can modify command text, copy it

    Y_ENSURE_EX(cmd.size() < MAX_COMMAND_LINE, yexception() << "Command is too long (length=" << cmd.size() << ")");
    TTempArray<wchar_t> cmdcopy(MAX_COMMAND_LINE);
    Copy(cmd.data(), cmd.data() + cmd.size(), cmdcopy.Data());
    *(cmdcopy.Data() + cmd.size()) = 0;

    const wchar_t* cwd = NULL;
    std::wstring cwdBuff;
    if (WorkDir.size()) {
        cwdBuff = GetWString(WorkDir.data());
        cwd = cwdBuff.c_str();
    }

    void* lpEnvironment = nullptr;
    TString env;
    if (!Options_.Environment.empty()) {
        for (auto e = Options_.Environment.begin(); e != Options_.Environment.end(); ++e) {
            env += e->first + '=' + e->second + '\0';
        }
        env += '\0';
        lpEnvironment = const_cast<char*>(env.data());
    }

    // disable messagebox (may be in debug too)
    #ifndef NDEBUG
    SetErrorMode(GetErrorMode() | SEM_NOGPFAULTERRORBOX);
    #endif
    BOOL res = 0;
    if (Options_.User.Name.empty() || GetUsername() == Options_.User.Name) {
        res = CreateProcessW(
            nullptr, // image name
            cmdcopy.Data(),
            nullptr,       // process security attributes
            nullptr,       // thread security attributes
            TRUE,          // inherit handles - needed for IO, CloseAllFdsOnExec not respected
            0,             // obscure creation flags
            lpEnvironment, // environment
            cwd,           // current directory
            &startup_info,
            &process_info);
    } else {
        res = CreateProcessWithLogonW(
            GetWString(Options_.User.Name.data()).c_str(),
            nullptr, // domain (if this parameter is NULL, the user name must be specified in UPN format)
            GetWString(Options_.User.Password.data()).c_str(),
            0,    // logon flags
            NULL, // image name
            cmdcopy.Data(),
            0,             // obscure creation flags
            lpEnvironment, // environment
            cwd,           // current directory
            &startup_info,
            &process_info);
    }

    if (!res) {
        ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
        /// @todo: write to error stream if set
        TStringOutput out(CollectedError);
        out << "Process was not created: " << LastSystemErrorText() << " command text was: '" << GetAString(cmdcopy.Data()) << "'";
    }
    Pid = process_info.hProcess;
    CloseHandle(process_info.hThread);
    DBG(Cerr << "created process id " << Pid << " in dir: " << cwd << ", cmd: " << cmdcopy.Data() << Endl);
}
#endif

void ShellQuoteArg(TString& dst, TStringBuf argument) {
    dst.append("\"");
    TStringBuf l, r;
    while (argument.TrySplit('"', l, r)) {
        dst.append(l);
        dst.append("\\\"");
        argument = r;
    }
    dst.append(argument);
    dst.append("\"");
}

void ShellQuoteArgSp(TString& dst, TStringBuf argument) {
    dst.append(' ');
    ShellQuoteArg(dst, argument);
}

bool ArgNeedsQuotes(TStringBuf arg) noexcept {
    if (arg.empty()) {
        return true;
    }
    return arg.find_first_of(" \"\'\t&()*<>\\`^|") != TString::npos;
}

TString TShellCommand::TImpl::GetQuotedCommand() const {
    TString quoted = Command; /// @todo command itself should be quoted too
    for (const auto& argument : Arguments) {
        // Don't add unnecessary quotes. It's especially important for the windows with a 32k command line length limit.
        if (Options_.QuoteArguments && ArgNeedsQuotes(argument)) {
            ::ShellQuoteArgSp(quoted, argument);
        } else {
            quoted.append(" ").append(argument);
        }
    }
    return quoted;
}

#if defined(_unix_)
void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* argv, char* const* envp, const std::function<void()>& afterFork) const {
    try {
        if (Options_.DetachSession) {
            setsid();
        }

        // reset signal handlers from parent
        struct sigaction sa;
        sa.sa_handler = SIG_DFL;
        sa.sa_flags = 0;
        SigEmptySet(&sa.sa_mask);
        for (int i = 0; i < NSIG; ++i) {
            // some signals cannot be caught, so just ignore return value
            sigaction(i, &sa, nullptr);
        }
        if (Options_.ClearSignalMask) {
            SigEmptySet(&oldmask);
        }
        // clear / restore signal mask
        if (SigProcMask(SIG_SETMASK, &oldmask, nullptr) != 0) {
            ythrow TSystemError() << "Cannot " << (Options_.ClearSignalMask ? "clear" : "restore") << " signal mask in child";
        }

        TFileHandle sIn(0);
        TFileHandle sOut(1);
        TFileHandle sErr(2);
        if (InputMode != TShellCommandOptions::HANDLE_INHERIT) {
            pipes.InputPipeFd[1].Close();
            TFileHandle sInNew(pipes.InputPipeFd[0]);
            sIn.LinkTo(sInNew);
            sIn.Release();
            sInNew.Release();
        } else {
            // do not close fd 0 - next open will return it and confuse all readers
            /// @todo in case of real need - reopen /dev/null
        }
        if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
            pipes.OutputPipeFd[0].Close();
            TFileHandle sOutNew(pipes.OutputPipeFd[1]);
            sOut.LinkTo(sOutNew);
            sOut.Release();
            sOutNew.Release();
        }
        if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
            pipes.ErrorPipeFd[0].Close();
            TFileHandle sErrNew(pipes.ErrorPipeFd[1]);
            sErr.LinkTo(sErrNew);
            sErr.Release();
            sErrNew.Release();
        }

        if (WorkDir.size()) {
            NFs::SetCurrentWorkingDirectory(WorkDir);
        }

        if (Options_.CloseAllFdsOnExec) {
            for (int fd = NSystemInfo::MaxOpenFiles(); fd > STDERR_FILENO; --fd) {
                fcntl(fd, F_SETFD, FD_CLOEXEC);
            }
        }

        if (!Options_.User.Name.empty()) {
            ImpersonateUser(Options_.User);
        }

        if (Options_.Nice) {
            // Don't verify Nice() call - it does not work properly with WSL https://github.com/Microsoft/WSL/issues/1838
            ::Nice(Options_.Nice);
        }
        if (afterFork) {
            afterFork();
        }

        if (envp == nullptr) {
            execvp(argv[0], argv);
        } else {
            execve(argv[0], argv, envp);
        }
        Cerr << "Process was not created: " << LastSystemErrorText() << Endl;
    } catch (const std::exception& error) {
        Cerr << "Process was not created: " << error.what() << Endl;
    } catch (...) {
        Cerr << "Process was not created: "
             << "unknown error" << Endl;
    }

    _exit(-1);
}
#endif

void TShellCommand::TImpl::Run() {
    Y_ENSURE(ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING, TStringBuf("Process is already running"));
    // Prepare I/O streams
    CollectedOutput.clear();
    CollectedError.clear();
    TPipes pipes;

    if (Options_.OutputMode != TShellCommandOptions::HANDLE_INHERIT) {
        TRealPipeHandle::Pipe(pipes.OutputPipeFd[0], pipes.OutputPipeFd[1], CloseOnExec);
    }
    if (Options_.ErrorMode != TShellCommandOptions::HANDLE_INHERIT) {
        TRealPipeHandle::Pipe(pipes.ErrorPipeFd[0], pipes.ErrorPipeFd[1], CloseOnExec);
    }
    if (InputMode != TShellCommandOptions::HANDLE_INHERIT) {
        TRealPipeHandle::Pipe(pipes.InputPipeFd[0], pipes.InputPipeFd[1], CloseOnExec);
    }

    ExecutionStatus.store(SHELL_RUNNING, std::memory_order_release);

#if defined(_unix_)
    // block all signals to avoid signal handler race after fork()
    sigset_t oldmask, newmask;
    SigFillSet(&newmask);
    if (SigProcMask(SIG_SETMASK, &newmask, &oldmask) != 0) {
        ythrow TSystemError() << "Cannot block all signals in parent";
    }

    /* arguments holders */
    TString shellArg;
    TVector<char*> qargv;
    /*
      Following "const_cast"s are safe:
      http://pubs.opengroup.org/onlinepubs/9699919799/functions/exec.html
    */
    if (Options_.UseShell) {
        shellArg = GetQuotedCommand();
        qargv.reserve(4);
        qargv.push_back(const_cast<char*>("/bin/sh"));
        qargv.push_back(const_cast<char*>("-c"));
        // two args for 'sh -c -- ',
        // one for program name, and one for NULL at the end
        qargv.push_back(const_cast<char*>(shellArg.data()));
    } else {
        qargv.reserve(Arguments.size() + 2);
        qargv.push_back(const_cast<char*>(Command.data()));
        for (auto& i : Arguments) {
            qargv.push_back(const_cast<char*>(i.data()));
        }
    }

    qargv.push_back(nullptr);

    TVector<TString> envHolder;
    TVector<char*> envp;
    if (!Options_.Environment.empty()) {
        for (auto& env : Options_.Environment) {
            envHolder.emplace_back(env.first + '=' + env.second);
            envp.push_back(const_cast<char*>(envHolder.back().data()));
        }
        envp.push_back(nullptr);
    }

    pid_t pid = fork();
    if (pid == -1) {
        ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
        /// @todo check if pipes are still open
        ythrow TSystemError() << "Cannot fork";
    } else if (pid == 0) { // child
        if (envp.size() != 0) {
            OnFork(pipes, oldmask, qargv.data(), envp.data(), Options_.FuncAfterFork);
        } else {
            OnFork(pipes, oldmask, qargv.data(), nullptr, Options_.FuncAfterFork);
        }
    } else { // parent
        // restore signal mask
        if (SigProcMask(SIG_SETMASK, &oldmask, nullptr) != 0) {
            ythrow TSystemError() << "Cannot restore signal mask in parent";
        }
    }
    Pid = pid;
#else
    StartProcess(pipes);
#endif
    pipes.PrepareParents();

    if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING) {
        return;
    }

    if (InputMode == TShellCommandOptions::HANDLE_PIPE) {
        TFileHandle inputHandle(pipes.InputPipeFd[1].Release());
        InputHandle.Swap(inputHandle);
    }

    if (Options_.OutputMode == TShellCommandOptions::HANDLE_PIPE) {
        TFileHandle outputHandle(pipes.OutputPipeFd[0].Release());
        OutputHandle.Swap(outputHandle);
    }

    if (Options_.ErrorMode == TShellCommandOptions::HANDLE_PIPE) {
        TFileHandle errorHandle(pipes.ErrorPipeFd[0].Release());
        ErrorHandle.Swap(errorHandle);
    }

    TProcessInfo* processInfo = new TProcessInfo(this,
                                                 pipes.InputPipeFd[1].Release(), pipes.OutputPipeFd[0].Release(), pipes.ErrorPipeFd[0].Release());
    if (Options_.AsyncMode) {
        WatchThread = new TThread(&TImpl::WatchProcess, processInfo);
        WatchThread->Start();
        /// @todo wait for child to start its process session (if options.Detach)
    } else {
        Communicate(processInfo);
    }

    pipes.ReleaseParents(); // not needed
}

void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
    THolder<IOutputStream> outputHolder;
    IOutputStream* output = pi->Parent->Options_.OutputStream;
    if (!output) {
        outputHolder.Reset(output = new TStringOutput(pi->Parent->CollectedOutput));
    }

    THolder<IOutputStream> errorHolder;
    IOutputStream* error = pi->Parent->Options_.ErrorStream;
    if (!error) {
        errorHolder.Reset(error = new TStringOutput(pi->Parent->CollectedError));
    }

    IInputStream*& input = pi->Parent->Options_.InputStream;

#if defined(_unix_)
    // not really needed, io is done via poll
    if (pi->OutputFd.IsOpen()) {
        SetNonBlock(pi->OutputFd);
    }
    if (pi->ErrorFd.IsOpen()) {
        SetNonBlock(pi->ErrorFd);
    }
    if (pi->InputFd.IsOpen()) {
        SetNonBlock(pi->InputFd);
    }
#endif

    try {
#if defined(_win_)
        TPipePump pumps[3] = {0};
        pumps[0] = {&pi->ErrorFd, error};
        pumps[1] = {&pi->OutputFd, output};

        TVector<THolder<TThread>> streamThreads;
        streamThreads.emplace_back(new TThread(&TImpl::ReadStream, &pumps[0]));
        streamThreads.emplace_back(new TThread(&TImpl::ReadStream, &pumps[1]));

        if (input) {
            pumps[2] = {&pi->InputFd, nullptr, input, &pi->Parent->Options_.ShouldCloseInput};
            streamThreads.emplace_back(new TThread(&TImpl::WriteStream, &pumps[2]));
        }

        for (auto& threadHolder : streamThreads)
            threadHolder->Start();
#else
        TBuffer buffer(DATA_BUFFER_SIZE);
        TBuffer inputBuffer(DATA_BUFFER_SIZE);
        int bytes;
        int bytesToWrite = 0;
        char* bufPos = nullptr;
#endif
        TWaitResult waitPidResult;
        TExitStatus status = 0;

        while (true) {
            {
                with_lock (pi->Parent->TerminateMutex) {
                    if (TerminateIsRequired(pi)) {
                        return;
                    }
                }

                waitPidResult =
#if defined(_unix_)
                    waitpid(pi->Parent->Pid, &status, WNOHANG);
#else
                    WaitForSingleObject(pi->Parent->Pid /* process_info.hProcess */, pi->Parent->Options_.PollDelayMs /* ms */);
                Y_UNUSED(status);
#endif
                // DBG(Cerr << "wait result: " << waitPidResult << Endl);
                if (waitPidResult != WAIT_PROCEED) {
                    break;
                }
            }
/// @todo factor out (poll + wfmo)
#if defined(_unix_)
            bool haveIn = false;
            bool haveOut = false;
            bool haveErr = false;

            if (!input && pi->InputFd.IsOpen()) {
                DBG(Cerr << "closing input stream..." << Endl);
                pi->InputFd.Close();
            }
            if (!output && pi->OutputFd.IsOpen()) {
                DBG(Cerr << "closing output stream..." << Endl);
                pi->OutputFd.Close();
            }
            if (!error && pi->ErrorFd.IsOpen()) {
                DBG(Cerr << "closing error stream..." << Endl);
                pi->ErrorFd.Close();
            }

            if (!input && !output && !error) {
                continue;
            }

            struct pollfd fds[] = {
                {REALPIPEHANDLE(pi->InputFd), POLLOUT, 0},
                {REALPIPEHANDLE(pi->OutputFd), POLLIN, 0},
                {REALPIPEHANDLE(pi->ErrorFd), POLLIN, 0}};
            int res;

            if (!input) {
                fds[0].events = 0;
            }
            if (!output) {
                fds[1].events = 0;
            }
            if (!error) {
                fds[2].events = 0;
            }

            res = PollD(fds, 3, TInstant::Now() + TDuration::MilliSeconds(pi->Parent->Options_.PollDelayMs));
            // DBG(Cerr << "poll result: " << res << Endl);
            if (-res == ETIMEDOUT || res == 0) {
                // DBG(Cerr << "poll again..." << Endl);
                continue;
            }
            if (res < 0) {
                ythrow yexception() << "poll failed: " << LastSystemErrorText();
            }

            if ((fds[1].revents & POLLIN) == POLLIN) {
                haveOut = true;
            } else if (fds[1].revents & (POLLERR | POLLHUP)) {
                output = nullptr;
            }

            if ((fds[2].revents & POLLIN) == POLLIN) {
                haveErr = true;
            } else if (fds[2].revents & (POLLERR | POLLHUP)) {
                error = nullptr;
            }

            if (input && ((fds[0].revents & POLLOUT) == POLLOUT)) {
                haveIn = true;
            }

            if (haveOut) {
                bytes = pi->OutputFd.Read(buffer.Data(), buffer.Capacity());
                DBG(Cerr << "transferred " << bytes << " bytes of output" << Endl);
                if (bytes > 0) {
                    output->Write(buffer.Data(), bytes);
                } else {
                    output = nullptr;
                }
            }
            if (haveErr) {
                bytes = pi->ErrorFd.Read(buffer.Data(), buffer.Capacity());
                DBG(Cerr << "transferred " << bytes << " bytes of error" << Endl);
                if (bytes > 0) {
                    error->Write(buffer.Data(), bytes);
                } else {
                    error = nullptr;
                }
            }

            if (haveIn) {
                if (!bytesToWrite) {
                    bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity());
                    if (bytesToWrite == 0) {
                        if (pi->Parent->Options_.ShouldCloseInput.load(std::memory_order_acquire)) {
                            input = nullptr;
                        }
                        continue;
                    }
                    bufPos = inputBuffer.Data();
                }

                bytes = pi->InputFd.Write(bufPos, bytesToWrite);
                if (bytes > 0) {
                    bytesToWrite -= bytes;
                    bufPos += bytes;
                } else {
                    input = nullptr;
                }

                DBG(Cerr << "transferred " << bytes << " bytes of input" << Endl);
            }
#endif
        }
        DBG(Cerr << "process finished" << Endl);

        // What's the reason of process exit.
        // We need to set exit code before waiting for input thread
        // Otherwise there is no way for input stream provider to discover
        // that process has exited and stream shouldn't wait for new data.
        bool cleanExit = false;
        TMaybe<int> processExitCode;
#if defined(_unix_)
        processExitCode = WEXITSTATUS(status);
        if (WIFEXITED(status) && processExitCode == 0) {
            cleanExit = true;
        } else if (WIFSIGNALED(status)) {
            processExitCode = -WTERMSIG(status);
        }
#else
        if (waitPidResult == WAIT_OBJECT_0) {
            DWORD exitCode = STILL_ACTIVE;
            if (!GetExitCodeProcess(pi->Parent->Pid, &exitCode)) {
                ythrow yexception() << "GetExitCodeProcess: " << LastSystemErrorText();
            }
            if (exitCode == 0)
                cleanExit = true;
            processExitCode = static_cast<int>(exitCode);
            DBG(Cerr << "exit code: " << exitCode << Endl);
        }
#endif
        pi->Parent->ExitCode = processExitCode;
        if (cleanExit) {
            pi->Parent->ExecutionStatus.store(SHELL_FINISHED, std::memory_order_release);
        } else {
            pi->Parent->ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
        }

#if defined(_win_)
        for (auto& threadHolder : streamThreads)
            threadHolder->Join();
        for (const auto pump : pumps) {
            if (!pump.InternalError.empty())
                throw yexception() << pump.InternalError;
        }
#else
        // Now let's read remaining stdout/stderr
        while (output && (bytes = pi->OutputFd.Read(buffer.Data(), buffer.Capacity())) > 0) {
            DBG(Cerr << bytes << " more bytes of output: " << Endl);
            output->Write(buffer.Data(), bytes);
        }
        while (error && (bytes = pi->ErrorFd.Read(buffer.Data(), buffer.Capacity())) > 0) {
            DBG(Cerr << bytes << " more bytes of error" << Endl);
            error->Write(buffer.Data(), bytes);
        }
#endif
    } catch (const yexception& e) {
        // Some error in watch occured, set result to error
        pi->Parent->ExecutionStatus.store(SHELL_INTERNAL_ERROR, std::memory_order_release);
        pi->Parent->InternalError = e.what();
        if (input) {
            pi->InputFd.Close();
        }
        Cdbg << "shell command internal error: " << pi->Parent->InternalError << Endl;
    }
    // Now we can safely delete process info struct and other data
    pi->Parent->TerminateFlag = true;
    TerminateIsRequired(pi);
}

TShellCommand::TShellCommand(const TStringBuf cmd, const TList<TString>& args, const TShellCommandOptions& options,
                             const TString& workdir)
    : Impl(new TImpl(cmd, args, options, workdir))
{
}

TShellCommand::TShellCommand(const TStringBuf cmd, const TShellCommandOptions& options, const TString& workdir)
    : Impl(new TImpl(cmd, TList<TString>(), options, workdir))
{
}

TShellCommand::~TShellCommand() = default;

TShellCommand& TShellCommand::operator<<(const TStringBuf argument) {
    Impl->AppendArgument(argument);
    return *this;
}

const TString& TShellCommand::GetOutput() const {
    return Impl->GetOutput();
}

const TString& TShellCommand::GetError() const {
    return Impl->GetError();
}

const TString& TShellCommand::GetInternalError() const {
    return Impl->GetInternalError();
}

TShellCommand::ECommandStatus TShellCommand::GetStatus() const {
    return Impl->GetStatus();
}

TMaybe<int> TShellCommand::GetExitCode() const {
    return Impl->GetExitCode();
}

TProcessId TShellCommand::GetPid() const {
    return Impl->GetPid();
}

TFileHandle& TShellCommand::GetInputHandle() {
    return Impl->GetInputHandle();
}

TFileHandle& TShellCommand::GetOutputHandle() {
    return Impl->GetOutputHandle();
}

TFileHandle& TShellCommand::GetErrorHandle() {
    return Impl->GetErrorHandle();
}

TShellCommand& TShellCommand::Run() {
    Impl->Run();
    return *this;
}

TShellCommand& TShellCommand::Terminate() {
    Impl->Terminate();
    return *this;
}

TShellCommand& TShellCommand::Wait() {
    Impl->Wait();
    return *this;
}

TShellCommand& TShellCommand::CloseInput() {
    Impl->CloseInput();
    return *this;
}

TString TShellCommand::GetQuotedCommand() const {
    return Impl->GetQuotedCommand();
}