summaryrefslogtreecommitdiffstats
path: root/util/stream/pipe.h
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/stream/pipe.h
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/stream/pipe.h')
-rw-r--r--util/stream/pipe.h112
1 files changed, 112 insertions, 0 deletions
diff --git a/util/stream/pipe.h b/util/stream/pipe.h
new file mode 100644
index 00000000000..18525b9517d
--- /dev/null
+++ b/util/stream/pipe.h
@@ -0,0 +1,112 @@
+#pragma once
+
+#include "input.h"
+#include "output.h"
+
+#include <util/system/pipe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+/**
+ * @addtogroup Streams_Pipes
+ * @{
+ */
+
+/**
+ * Base class for starting a process and communicating with it via pipes.
+ */
+class TPipeBase {
+protected:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ * @param mode Data transfer mode for the pipe. Use
+ * "r" for reading and "w" for writing.
+ */
+ TPipeBase(const TString& command, const char* mode);
+ virtual ~TPipeBase();
+
+protected:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Input stream that binds to a standard output stream of a newly started process.
+ *
+ * Note that if the process ends with non-zero exit status, `Read` function will
+ * throw an exception.
+ */
+class TPipeInput: protected TPipeBase, public IInputStream {
+public:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ */
+ TPipeInput(const TString& command);
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+};
+
+/**
+ * Output stream that binds to a standard input stream of a newly started process.
+ *
+ * Note that if the process ends with non-zero exit status, `Close` function will
+ * throw an exception.
+ */
+class TPipeOutput: protected TPipeBase, public IOutputStream {
+public:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ */
+ TPipeOutput(const TString& command);
+
+ /**
+ * Waits for the process to terminate and throws an exception if it ended
+ * with a non-zero exit status.
+ */
+ void Close();
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+class TPipedBase {
+protected:
+ TPipedBase(PIPEHANDLE fd);
+ virtual ~TPipedBase();
+
+protected:
+ TPipeHandle Handle_;
+};
+
+/**
+ * Input stream that binds to a standard output stream of an existing process.
+ */
+class TPipedInput: public TPipedBase, public IInputStream {
+public:
+ TPipedInput(PIPEHANDLE fd);
+ ~TPipedInput() override;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+};
+
+/**
+ * Output stream that binds to a standard input stream of an existing process.
+ */
+class TPipedOutput: public TPipedBase, public IOutputStream {
+public:
+ TPipedOutput(PIPEHANDLE fd);
+ ~TPipedOutput() override;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+/** @} */