aboutsummaryrefslogtreecommitdiffstats
path: root/util/stream/pipe.cpp
blob: ff5b7788021c4b13fbd9212fe346fed5228ff229 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#include "pipe.h"

#include <util/generic/yexception.h>

#include <cstdio>
#include <cerrno>

class TPipeBase::TImpl {
public:
    inline TImpl(const TString& command, const char* mode)
        : Pipe_(nullptr) 
    {
#ifndef _freebsd_
        if (strcmp(mode, "r+") == 0) {
            ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD";
        }
#endif
        Pipe_ = ::popen(command.data(), mode);
        if (Pipe_ == nullptr) { 
            ythrow TSystemError() << "failed to open pipe: " << command.Quote();
        }
    }

    inline ~TImpl() {
        if (Pipe_ != nullptr) { 
            ::pclose(Pipe_);
        }
    }

public:
    FILE* Pipe_;
};

TPipeBase::TPipeBase(const TString& command, const char* mode)
    : Impl_(new TImpl(command, mode))
{
}

TPipeBase::~TPipeBase() = default;

TPipeInput::TPipeInput(const TString& command)
    : TPipeBase(command, "r")
{
}

size_t TPipeInput::DoRead(void* buf, size_t len) {
    if (Impl_->Pipe_ == nullptr) { 
        return 0;
    }

    size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_);
    if (bytesRead == 0) {
        int exitStatus = ::pclose(Impl_->Pipe_);
        Impl_->Pipe_ = nullptr; 
        if (exitStatus == -1) {
            ythrow TSystemError() << "pclose() failed";
        } else if (exitStatus != 0) {
            ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
        }
    }
    return bytesRead;
}

TPipeOutput::TPipeOutput(const TString& command)
    : TPipeBase(command, "w")
{
}

void TPipeOutput::DoWrite(const void* buf, size_t len) {
    if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) { 
        ythrow TSystemError() << "fwrite failed";
    }
}

void TPipeOutput::Close() {
    int exitStatus = ::pclose(Impl_->Pipe_);
    Impl_->Pipe_ = nullptr; 
    if (exitStatus == -1) {
        ythrow TSystemError() << "pclose() failed";
    } else if (exitStatus != 0) {
        ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
    }
}

TPipedBase::TPipedBase(PIPEHANDLE fd)
    : Handle_(fd)
{
}

TPipedBase::~TPipedBase() {
    if (Handle_.IsOpen()) {
        Handle_.Close();
    }
}

TPipedInput::TPipedInput(PIPEHANDLE fd)
    : TPipedBase(fd)
{
}

TPipedInput::~TPipedInput() = default;

size_t TPipedInput::DoRead(void* buf, size_t len) {
    if (!Handle_.IsOpen()) {
        return 0;
    }
    return Handle_.Read(buf, len);
}

TPipedOutput::TPipedOutput(PIPEHANDLE fd)
    : TPipedBase(fd)
{
}

TPipedOutput::~TPipedOutput() = default;

void TPipedOutput::DoWrite(const void* buf, size_t len) {
    if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) {
        ythrow TSystemError() << "pipe writing failed";
    }
}