diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/stream/pipe.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/stream/pipe.cpp')
-rw-r--r-- | util/stream/pipe.cpp | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/util/stream/pipe.cpp b/util/stream/pipe.cpp new file mode 100644 index 0000000000..51be1934a7 --- /dev/null +++ b/util/stream/pipe.cpp @@ -0,0 +1,121 @@ +#include "pipe.h" + +#include <util/generic/yexception.h> + +#include <cstdio> +#include <cerrno> + +class TPipeBase::TImpl { +public: + inline TImpl(const TString& command, const char* mode) + : Pipe_(nullptr) + { +#ifndef _freebsd_ + if (strcmp(mode, "r+") == 0) { + ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD"; + } +#endif + Pipe_ = ::popen(command.data(), mode); + if (Pipe_ == nullptr) { + ythrow TSystemError() << "failed to open pipe: " << command.Quote(); + } + } + + inline ~TImpl() { + if (Pipe_ != nullptr) { + ::pclose(Pipe_); + } + } + +public: + FILE* Pipe_; +}; + +TPipeBase::TPipeBase(const TString& command, const char* mode) + : Impl_(new TImpl(command, mode)) +{ +} + +TPipeBase::~TPipeBase() = default; + +TPipeInput::TPipeInput(const TString& command) + : TPipeBase(command, "r") +{ +} + +size_t TPipeInput::DoRead(void* buf, size_t len) { + if (Impl_->Pipe_ == nullptr) { + return 0; + } + + size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_); + if (bytesRead == 0) { + int exitStatus = ::pclose(Impl_->Pipe_); + Impl_->Pipe_ = nullptr; + if (exitStatus == -1) { + ythrow TSystemError() << "pclose() failed"; + } else if (exitStatus != 0) { + ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; + } + } + return bytesRead; +} + +TPipeOutput::TPipeOutput(const TString& command) + : TPipeBase(command, "w") +{ +} + +void TPipeOutput::DoWrite(const void* buf, size_t len) { + if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) { + ythrow TSystemError() << "fwrite failed"; + } +} + +void TPipeOutput::Close() { + int exitStatus = ::pclose(Impl_->Pipe_); + Impl_->Pipe_ = nullptr; + if (exitStatus == -1) { + ythrow TSystemError() << "pclose() failed"; + } else if (exitStatus != 0) { + ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; + } +} + +TPipedBase::TPipedBase(PIPEHANDLE fd) + : Handle_(fd) +{ +} + +TPipedBase::~TPipedBase() { + if (Handle_.IsOpen()) { + Handle_.Close(); + } +} + +TPipedInput::TPipedInput(PIPEHANDLE fd) + : TPipedBase(fd) +{ +} + +TPipedInput::~TPipedInput() = default; + +size_t TPipedInput::DoRead(void* buf, size_t len) { + if (!Handle_.IsOpen()) { + return 0; + } + return Handle_.Read(buf, len); +} + +TPipedOutput::TPipedOutput(PIPEHANDLE fd) + : TPipedBase(fd) +{ +} + +TPipedOutput::~TPipedOutput() = default; + +void TPipedOutput::DoWrite(const void* buf, size_t len) { + if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) { + ythrow TSystemError() << "pipe writing failed"; + } +} |