aboutsummaryrefslogtreecommitdiffstats
path: root/util/system/pipe.cpp
blob: 012ee2c30eb1879e90180fbfc75b056b6e8d0c88 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "pipe.h" 
 
#include <util/stream/output.h>
#include <util/generic/yexception.h> 

ssize_t TPipeHandle::Read(void* buffer, size_t byteCount) const noexcept {
#ifdef _win_
    return recv(Fd_, (char*)buffer, byteCount, 0); 
#else
    return read(Fd_, buffer, byteCount); 
#endif
}

ssize_t TPipeHandle::Write(const void* buffer, size_t byteCount) const noexcept {
#ifdef _win_
    return send(Fd_, (const char*)buffer, byteCount, 0); 
#else
    return write(Fd_, buffer, byteCount); 
#endif
}

bool TPipeHandle::Close() noexcept {
    bool ok = true;
    if (Fd_ != INVALID_PIPEHANDLE) { 
#ifdef _win_
        ok = closesocket(Fd_) == 0; 
#else
        ok = close(Fd_) == 0; 
#endif
    }
    Fd_ = INVALID_PIPEHANDLE; 
    return ok;
}

void TPipeHandle::Pipe(TPipeHandle& reader, TPipeHandle& writer, EOpenMode mode) {
    PIPEHANDLE fds[2]; 
#ifdef _win_ 
    int r = SocketPair(fds, false /* non-overlapped */, mode & CloseOnExec /* cloexec */);
#elif defined(_linux_)
    int r = pipe2(fds, mode & CloseOnExec ? O_CLOEXEC : 0);
#else 
    int r = pipe(fds); 
#endif 
    if (r < 0) { 
        ythrow TFileError() << "failed to create a pipe"; 
    } 
 
#if !defined(_win_) && !defined(_linux_)
    // Non-atomic wrt exec
    if (mode & CloseOnExec) {
        for (int i = 0; i < 2; ++i) {
            int flags = fcntl(fds[i], F_GETFD, 0);
            if (flags < 0) {
                ythrow TFileError() << "failed to get flags";
            }
            int r = fcntl(fds[i], F_SETFD, flags | FD_CLOEXEC);
            if (r < 0) {
                ythrow TFileError() << "failed to set flags";
            }
        }
    }
#endif

    TPipeHandle(fds[0]).Swap(reader); 
    TPipeHandle(fds[1]).Swap(writer); 
} 
 
class TPipe::TImpl: public TAtomicRefCount<TImpl> { 
public:
    TImpl()
        : Handle_(INVALID_PIPEHANDLE) 
    {
    }
 
    TImpl(PIPEHANDLE fd)
        : Handle_(fd) 
    {
    }

    inline ~TImpl() {
        Close();
    }

    bool IsOpen() {
        return Handle_.IsOpen(); 
    }

    inline void Close() {
        if (!Handle_.IsOpen()) { 
            return;
        } 
        if (!Handle_.Close()) { 
            ythrow TFileError() << "failed to close pipe"; 
        } 
    }

    TPipeHandle& GetHandle() noexcept {
        return Handle_; 
    }

    size_t Read(void* buffer, size_t count) const {
        ssize_t r = Handle_.Read(buffer, count); 
        if (r < 0) { 
            ythrow TFileError() << "failed to read from pipe"; 
        } 
        return r;
    }

    size_t Write(const void* buffer, size_t count) const {
        ssize_t r = Handle_.Write(buffer, count); 
        if (r < 0) { 
            ythrow TFileError() << "failed to write to pipe"; 
        } 
        return r;
    }
 
private:
    TPipeHandle Handle_; 
};

TPipe::TPipe()
    : Impl_(new TImpl) 
{
}

TPipe::TPipe(PIPEHANDLE fd)
    : Impl_(new TImpl(fd)) 
{
}

TPipe::~TPipe() = default;

void TPipe::Close() {
    Impl_->Close(); 
}

PIPEHANDLE TPipe::GetHandle() const noexcept {
    return Impl_->GetHandle(); 
}

bool TPipe::IsOpen() const noexcept {
    return Impl_->IsOpen(); 
}

size_t TPipe::Read(void* buf, size_t len) const {
    return Impl_->Read(buf, len); 
}

size_t TPipe::Write(const void* buf, size_t len) const {
    return Impl_->Write(buf, len); 
}

void TPipe::Pipe(TPipe& reader, TPipe& writer, EOpenMode mode) {
    TImplRef r(new TImpl()); 
    TImplRef w(new TImpl()); 
 
    TPipeHandle::Pipe(r->GetHandle(), w->GetHandle(), mode);
 
    r.Swap(reader.Impl_); 
    w.Swap(writer.Impl_); 
}