1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
#include "pipe.h"
#include <util/generic/yexception.h>
#include <cstdio>
#include <cerrno>
class TPipeBase::TImpl {
public:
inline TImpl(const TString& command, const char* mode)
: Pipe_(nullptr)
{
#ifndef _freebsd_
if (strcmp(mode, "r+") == 0) {
ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD";
}
#endif
Pipe_ = ::popen(command.data(), mode);
if (Pipe_ == nullptr) {
ythrow TSystemError() << "failed to open pipe: " << command.Quote();
}
}
inline ~TImpl() {
if (Pipe_ != nullptr) {
::pclose(Pipe_);
}
}
public:
FILE* Pipe_;
};
TPipeBase::TPipeBase(const TString& command, const char* mode)
: Impl_(new TImpl(command, mode))
{
}
TPipeBase::~TPipeBase() = default;
TPipeInput::TPipeInput(const TString& command)
: TPipeBase(command, "r")
{
}
size_t TPipeInput::DoRead(void* buf, size_t len) {
if (Impl_->Pipe_ == nullptr) {
return 0;
}
size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_);
if (bytesRead == 0) {
int exitStatus = ::pclose(Impl_->Pipe_);
Impl_->Pipe_ = nullptr;
if (exitStatus == -1) {
ythrow TSystemError() << "pclose() failed";
} else if (exitStatus != 0) {
ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
}
}
return bytesRead;
}
TPipeOutput::TPipeOutput(const TString& command)
: TPipeBase(command, "w")
{
}
void TPipeOutput::DoWrite(const void* buf, size_t len) {
if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) {
ythrow TSystemError() << "fwrite failed";
}
}
void TPipeOutput::Close() {
int exitStatus = ::pclose(Impl_->Pipe_);
Impl_->Pipe_ = nullptr;
if (exitStatus == -1) {
ythrow TSystemError() << "pclose() failed";
} else if (exitStatus != 0) {
ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
}
}
TPipedBase::TPipedBase(PIPEHANDLE fd)
: Handle_(fd)
{
}
TPipedBase::~TPipedBase() {
if (Handle_.IsOpen()) {
Handle_.Close();
}
}
TPipedInput::TPipedInput(PIPEHANDLE fd)
: TPipedBase(fd)
{
}
TPipedInput::~TPipedInput() = default;
size_t TPipedInput::DoRead(void* buf, size_t len) {
if (!Handle_.IsOpen()) {
return 0;
}
return Handle_.Read(buf, len);
}
TPipedOutput::TPipedOutput(PIPEHANDLE fd)
: TPipedBase(fd)
{
}
TPipedOutput::~TPipedOutput() = default;
void TPipedOutput::DoWrite(const void* buf, size_t len) {
if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) {
ythrow TSystemError() << "pipe writing failed";
}
}
|