aboutsummaryrefslogtreecommitdiffstats
path: root/util/stream
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/stream
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/stream')
-rw-r--r--util/stream/aligned.cpp30
-rw-r--r--util/stream/aligned.h99
-rw-r--r--util/stream/aligned_ut.cpp63
-rw-r--r--util/stream/buffer.cpp120
-rw-r--r--util/stream/buffer.h119
-rw-r--r--util/stream/buffer_ut.cpp85
-rw-r--r--util/stream/buffered.cpp428
-rw-r--r--util/stream/buffered.h235
-rw-r--r--util/stream/buffered_ut.cpp142
-rw-r--r--util/stream/debug.cpp50
-rw-r--r--util/stream/debug.h53
-rw-r--r--util/stream/direct_io.cpp47
-rw-r--r--util/stream/direct_io.h43
-rw-r--r--util/stream/direct_io_ut.cpp71
-rw-r--r--util/stream/file.cpp97
-rw-r--r--util/stream/file.h108
-rw-r--r--util/stream/file_ut.cpp74
-rw-r--r--util/stream/format.cpp134
-rw-r--r--util/stream/format.h444
-rw-r--r--util/stream/format_ut.cpp182
-rw-r--r--util/stream/fwd.cpp1
-rw-r--r--util/stream/fwd.h100
-rw-r--r--util/stream/hex.cpp30
-rw-r--r--util/stream/hex.h8
-rw-r--r--util/stream/hex_ut.cpp29
-rw-r--r--util/stream/holder.cpp1
-rw-r--r--util/stream/holder.h44
-rw-r--r--util/stream/input.cpp344
-rw-r--r--util/stream/input.h273
-rw-r--r--util/stream/input_ut.cpp157
-rw-r--r--util/stream/ios_ut.cpp497
-rw-r--r--util/stream/labeled.cpp1
-rw-r--r--util/stream/labeled.h19
-rw-r--r--util/stream/labeled_ut.cpp12
-rw-r--r--util/stream/length.cpp47
-rw-r--r--util/stream/length.h100
-rw-r--r--util/stream/length_ut.cpp52
-rw-r--r--util/stream/mem.cpp65
-rw-r--r--util/stream/mem.h255
-rw-r--r--util/stream/mem_ut.cpp78
-rw-r--r--util/stream/multi.cpp56
-rw-r--r--util/stream/multi.h32
-rw-r--r--util/stream/multi_ut.cpp51
-rw-r--r--util/stream/null.cpp36
-rw-r--r--util/stream/null.h61
-rw-r--r--util/stream/output.cpp428
-rw-r--r--util/stream/output.h304
-rw-r--r--util/stream/output.pxd12
-rw-r--r--util/stream/pipe.cpp121
-rw-r--r--util/stream/pipe.h112
-rw-r--r--util/stream/printf.cpp51
-rw-r--r--util/stream/printf.h25
-rw-r--r--util/stream/printf_ut.cpp33
-rw-r--r--util/stream/str.cpp44
-rw-r--r--util/stream/str.h215
-rw-r--r--util/stream/str.pxd12
-rw-r--r--util/stream/str_ut.cpp152
-rw-r--r--util/stream/str_ut.pyx62
-rw-r--r--util/stream/tee.cpp24
-rw-r--r--util/stream/tee.h28
-rw-r--r--util/stream/tempbuf.cpp22
-rw-r--r--util/stream/tempbuf.h21
-rw-r--r--util/stream/tokenizer.cpp1
-rw-r--r--util/stream/tokenizer.h214
-rw-r--r--util/stream/tokenizer_ut.cpp264
-rw-r--r--util/stream/trace.cpp1
-rw-r--r--util/stream/trace.h60
-rw-r--r--util/stream/ut/ya.make30
-rw-r--r--util/stream/walk.cpp22
-rw-r--r--util/stream/walk.h35
-rw-r--r--util/stream/walk_ut.cpp55
-rw-r--r--util/stream/ya.make6
-rw-r--r--util/stream/zerocopy.cpp60
-rw-r--r--util/stream/zerocopy.h91
-rw-r--r--util/stream/zerocopy_output.cpp18
-rw-r--r--util/stream/zerocopy_output.h57
-rw-r--r--util/stream/zerocopy_output_ut.cpp48
-rw-r--r--util/stream/zlib.cpp380
-rw-r--r--util/stream/zlib.h173
-rw-r--r--util/stream/zlib_ut.cpp230
80 files changed, 8384 insertions, 0 deletions
diff --git a/util/stream/aligned.cpp b/util/stream/aligned.cpp
new file mode 100644
index 0000000000..2fd12d15b7
--- /dev/null
+++ b/util/stream/aligned.cpp
@@ -0,0 +1,30 @@
+#include "aligned.h"
+
+size_t TAlignedInput::DoRead(void* ptr, size_t len) {
+ size_t ret = Stream_->Read(ptr, len);
+ Position_ += ret;
+ return ret;
+}
+
+size_t TAlignedInput::DoSkip(size_t len) {
+ size_t ret = Stream_->Skip(len);
+ Position_ += ret;
+ return ret;
+}
+
+size_t TAlignedInput::DoReadTo(TString& st, char ch) {
+ size_t ret = Stream_->ReadTo(st, ch);
+ Position_ += ret;
+ return ret;
+}
+
+ui64 TAlignedInput::DoReadAll(IOutputStream& out) {
+ ui64 ret = Stream_->ReadAll(out);
+ Position_ += ret;
+ return ret;
+}
+
+void TAlignedOutput::DoWrite(const void* ptr, size_t len) {
+ Stream_->Write(ptr, len);
+ Position_ += len;
+}
diff --git a/util/stream/aligned.h b/util/stream/aligned.h
new file mode 100644
index 0000000000..70e7be05a9
--- /dev/null
+++ b/util/stream/aligned.h
@@ -0,0 +1,99 @@
+#pragma once
+
+#include "input.h"
+#include "output.h"
+
+#include <util/system/yassert.h>
+#include <util/generic/bitops.h>
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Proxy input stream that provides additional functions that make reading
+ * aligned data easier.
+ */
+class TAlignedInput: public IInputStream {
+public:
+ TAlignedInput(IInputStream* s)
+ : Stream_(s)
+ , Position_(0)
+ {
+ }
+
+ /**
+ * Ensures alignment of the position in the input stream by skipping
+ * some input.
+ *
+ * @param alignment Alignment. Must be a power of 2.
+ */
+ void Align(size_t alignment = sizeof(void*)) {
+ Y_ASSERT(IsPowerOf2(alignment));
+
+ if (Position_ & (alignment - 1)) {
+ size_t len = alignment - (Position_ & (alignment - 1));
+
+ do {
+ len -= DoSkip(len);
+ } while (len);
+ }
+ }
+
+private:
+ size_t DoRead(void* ptr, size_t len) override;
+ size_t DoSkip(size_t len) override;
+ size_t DoReadTo(TString& st, char ch) override;
+ ui64 DoReadAll(IOutputStream& out) override;
+
+private:
+ IInputStream* Stream_;
+ ui64 Position_;
+};
+
+/**
+ * Proxy output stream that provides additional functions that make writing
+ * aligned data easier.
+ */
+class TAlignedOutput: public IOutputStream {
+public:
+ TAlignedOutput(IOutputStream* s)
+ : Stream_(s)
+ , Position_(0)
+ {
+ }
+
+ TAlignedOutput(TAlignedOutput&&) noexcept = default;
+ TAlignedOutput& operator=(TAlignedOutput&&) noexcept = default;
+
+ size_t GetCurrentOffset() const {
+ return Position_;
+ }
+
+ /**
+ * Ensures alignment of the position in the output stream by writing
+ * some data.
+ *
+ * @param alignment Alignment. Must be a power of 2.
+ */
+ void Align(size_t alignment = sizeof(void*)) {
+ Y_ASSERT(IsPowerOf2(alignment));
+
+ static char unused[sizeof(void*) * 2];
+ Y_ASSERT(alignment <= sizeof(unused));
+
+ if (Position_ & (alignment - 1)) {
+ DoWrite(unused, alignment - (Position_ & (alignment - 1)));
+ }
+ }
+
+private:
+ void DoWrite(const void* ptr, size_t len) override;
+
+private:
+ IOutputStream* Stream_;
+ ui64 Position_;
+};
+
+/** @} */
diff --git a/util/stream/aligned_ut.cpp b/util/stream/aligned_ut.cpp
new file mode 100644
index 0000000000..e980d05cf7
--- /dev/null
+++ b/util/stream/aligned_ut.cpp
@@ -0,0 +1,63 @@
+#include "aligned.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+class TNastyInputStream: public IInputStream {
+public:
+ TNastyInputStream()
+ : Pos_(0)
+ {
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override {
+ if (len == 0) {
+ return 0;
+ }
+
+ *static_cast<unsigned char*>(buf) = static_cast<unsigned char>(Pos_);
+ ++Pos_;
+ return 1;
+ }
+
+ size_t DoSkip(size_t len) override {
+ if (len == 0) {
+ return 0;
+ }
+
+ ++Pos_;
+ return 1;
+ }
+
+private:
+ size_t Pos_;
+};
+
+Y_UNIT_TEST_SUITE(TAlignedTest) {
+ Y_UNIT_TEST(AlignInput) {
+ TNastyInputStream input0;
+ TAlignedInput alignedInput(&input0);
+
+ char c = '\1';
+
+ alignedInput.Align(2);
+ alignedInput.ReadChar(c);
+ UNIT_ASSERT_VALUES_EQUAL(c, '\x0');
+
+ alignedInput.Align(2);
+ alignedInput.ReadChar(c);
+ UNIT_ASSERT_VALUES_EQUAL(c, '\x2');
+
+ alignedInput.Align(4);
+ alignedInput.ReadChar(c);
+ UNIT_ASSERT_VALUES_EQUAL(c, '\x4');
+
+ alignedInput.Align(16);
+ alignedInput.ReadChar(c);
+ UNIT_ASSERT_VALUES_EQUAL(c, '\x10');
+
+ alignedInput.Align(128);
+ alignedInput.ReadChar(c);
+ UNIT_ASSERT_VALUES_EQUAL(c, '\x80');
+ }
+}
diff --git a/util/stream/buffer.cpp b/util/stream/buffer.cpp
new file mode 100644
index 0000000000..2facece4ea
--- /dev/null
+++ b/util/stream/buffer.cpp
@@ -0,0 +1,120 @@
+#include "buffer.h"
+#include <util/generic/buffer.h>
+#include <util/generic/yexception.h>
+
+class TBufferOutput::TImpl {
+public:
+ inline TImpl(TBuffer& buf)
+ : Data_(buf)
+ {
+ }
+
+ virtual ~TImpl() = default;
+
+ inline size_t DoNext(void** ptr) {
+ if (Data_.Avail() == 0) {
+ Data_.Reserve(FastClp2(Data_.Capacity() + MinBufferGrowSize));
+ }
+ size_t previousSize = Data_.size();
+ Data_.Resize(Data_.Capacity());
+ *ptr = Data_.Begin() + previousSize;
+ return Data_.Size() - previousSize;
+ }
+
+ inline void DoUndo(size_t len) {
+ Y_VERIFY(len <= Data_.Size(), "trying to undo more bytes than actually written");
+ Data_.Resize(Data_.size() - len);
+ }
+
+ inline void DoWrite(const void* buf, size_t len) {
+ Data_.Append((const char*)buf, len);
+ }
+
+ inline void DoWriteC(char c) {
+ Data_.Append(c);
+ }
+
+ inline TBuffer& Buffer() const noexcept {
+ return Data_;
+ }
+
+private:
+ TBuffer& Data_;
+ static constexpr size_t MinBufferGrowSize = 16;
+};
+
+namespace {
+ using TImpl = TBufferOutput::TImpl;
+
+ class TOwnedImpl: private TBuffer, public TImpl {
+ public:
+ inline TOwnedImpl(size_t buflen)
+ : TBuffer(buflen)
+ , TImpl(static_cast<TBuffer&>(*this))
+ {
+ }
+ };
+}
+
+TBufferOutput::TBufferOutput(size_t buflen)
+ : Impl_(new TOwnedImpl(buflen))
+{
+}
+
+TBufferOutput::TBufferOutput(TBuffer& buffer)
+ : Impl_(new TImpl(buffer))
+{
+}
+
+TBufferOutput::TBufferOutput(TBufferOutput&&) noexcept = default;
+TBufferOutput& TBufferOutput::operator=(TBufferOutput&&) noexcept = default;
+
+TBufferOutput::~TBufferOutput() = default;
+
+TBuffer& TBufferOutput::Buffer() const noexcept {
+ return Impl_->Buffer();
+}
+
+size_t TBufferOutput::DoNext(void** ptr) {
+ return Impl_->DoNext(ptr);
+}
+
+void TBufferOutput::DoUndo(size_t len) {
+ Impl_->DoUndo(len);
+}
+
+void TBufferOutput::DoWrite(const void* buf, size_t len) {
+ Impl_->DoWrite(buf, len);
+}
+
+void TBufferOutput::DoWriteC(char c) {
+ Impl_->DoWriteC(c);
+}
+
+TBufferInput::TBufferInput(const TBuffer& buffer)
+ : Buf_(buffer)
+ , Readed_(0)
+{
+}
+
+TBufferInput::~TBufferInput() = default;
+
+const TBuffer& TBufferInput::Buffer() const noexcept {
+ return Buf_;
+}
+
+void TBufferInput::Rewind() noexcept {
+ Readed_ = 0;
+}
+
+size_t TBufferInput::DoNext(const void** ptr, size_t len) {
+ len = Min(Buf_.Size() - Readed_, len);
+ *ptr = Buf_.data() + Readed_;
+ Readed_ += len;
+ return len;
+}
+
+void TBufferInput::DoUndo(size_t len) {
+ Y_VERIFY(len <= Readed_);
+ Readed_ -= len;
+}
diff --git a/util/stream/buffer.h b/util/stream/buffer.h
new file mode 100644
index 0000000000..9dc99dbe49
--- /dev/null
+++ b/util/stream/buffer.h
@@ -0,0 +1,119 @@
+#pragma once
+
+#include "zerocopy.h"
+#include "zerocopy_output.h"
+
+#include <util/generic/ptr.h>
+
+class TBuffer;
+
+/**
+ * @addtogroup Streams_Buffers
+ * @{
+ */
+
+/**
+ * Output stream that writes into a `TBuffer`.
+ */
+class TBufferOutput: public IZeroCopyOutput {
+public:
+ class TImpl;
+
+ /**
+ * Constructs a stream that writes into an internal buffer.
+ *
+ * @param buflen Initial size of the internal buffer.
+ */
+ TBufferOutput(size_t buflen = 1024);
+
+ /**
+ * Constructs a stream that writes into the provided buffer. It's up to the
+ * user to make sure that the buffer doesn't get destroyed while this stream
+ * is in use.
+ *
+ * @param buffer Buffer to write into.
+ */
+ TBufferOutput(TBuffer& buffer);
+
+ TBufferOutput(TBufferOutput&&) noexcept;
+ TBufferOutput& operator=(TBufferOutput&&) noexcept;
+
+ ~TBufferOutput() override;
+
+ /**
+ * @returns Buffer that this stream writes into.
+ */
+ TBuffer& Buffer() const noexcept;
+
+private:
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t len) override;
+ void DoWrite(const void* buf, size_t len) override;
+ void DoWriteC(char c) override;
+
+private:
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Input stream that reads from an external `TBuffer`.
+ */
+class TBufferInput: public IZeroCopyInputFastReadTo {
+public:
+ /**
+ * Constructs a stream that reads from an external buffer. It's up to the
+ * user to make sure that the buffer doesn't get destroyed before this
+ * stream.
+ *
+ * @param buffer External buffer to read from.
+ */
+ TBufferInput(const TBuffer& buffer);
+
+ ~TBufferInput() override;
+
+ const TBuffer& Buffer() const noexcept;
+
+ void Rewind() noexcept;
+
+protected:
+ size_t DoNext(const void** ptr, size_t len) override;
+ void DoUndo(size_t len) override;
+
+private:
+ const TBuffer& Buf_;
+ size_t Readed_;
+};
+
+/**
+ * Input/output stream that works with a `TBuffer`.
+ */
+class TBufferStream: public TBufferOutput, public TBufferInput {
+public:
+ /**
+ * Constructs a stream that works with an internal buffer.
+ *
+ * @param buflen Initial size of the internal buffer.
+ */
+ inline TBufferStream(size_t buflen = 1024)
+ : TBufferOutput(buflen)
+ , TBufferInput(TBufferOutput::Buffer())
+ {
+ }
+
+ /**
+ * Constructs a stream that works with the provided buffer.
+ *
+ * @param buffer Buffer to work with.
+ */
+ inline TBufferStream(TBuffer& buffer)
+ : TBufferOutput(buffer)
+ , TBufferInput(TBufferOutput::Buffer())
+ {
+ }
+
+ ~TBufferStream() override = default;
+
+ using TBufferOutput::Buffer;
+};
+
+/** @} */
diff --git a/util/stream/buffer_ut.cpp b/util/stream/buffer_ut.cpp
new file mode 100644
index 0000000000..3494696190
--- /dev/null
+++ b/util/stream/buffer_ut.cpp
@@ -0,0 +1,85 @@
+#include "buffer.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/buffer.h>
+
+#include <cstring>
+
+#include "str.h"
+
+Y_UNIT_TEST_SUITE(TBufferTest) {
+ Y_UNIT_TEST(Transfer) {
+ TBuffer buffer("razrazraz", 9);
+ TBufferInput input(buffer);
+
+ input.Skip(3);
+
+ TStringStream output;
+ TransferData(&input, &output);
+
+ UNIT_ASSERT_VALUES_EQUAL(output.Str(), "razraz");
+ }
+
+ Y_UNIT_TEST(ReadTo) {
+ TBuffer buffer("1234567890", 10);
+ TBufferInput input(buffer);
+
+ TString tmp;
+ UNIT_ASSERT_VALUES_EQUAL(input.ReadTo(tmp, '3'), 3);
+ UNIT_ASSERT_VALUES_EQUAL(tmp, "12");
+
+ UNIT_ASSERT_VALUES_EQUAL(input.ReadTo(tmp, 'z'), 7);
+ UNIT_ASSERT_VALUES_EQUAL(tmp, "4567890");
+ }
+
+ Y_UNIT_TEST(WriteViaNextAndUndo) {
+ TBuffer buffer;
+ TBufferOutput output(buffer);
+ TString str;
+
+ for (size_t i = 0; i < 10000; ++i) {
+ str.push_back('a' + (i % 20));
+ }
+
+ size_t written = 0;
+ void* ptr = nullptr;
+ while (written < str.size()) {
+ size_t bufferSize = output.Next(&ptr);
+ UNIT_ASSERT(ptr && bufferSize > 0);
+ size_t toWrite = Min(bufferSize, str.size() - written);
+ memcpy(ptr, str.begin() + written, toWrite);
+ written += toWrite;
+ if (toWrite < bufferSize) {
+ output.Undo(bufferSize - toWrite);
+ }
+ }
+
+ UNIT_ASSERT(0 == memcmp(buffer.data(), str.begin(), buffer.size()));
+ }
+
+ Y_UNIT_TEST(Write) {
+ TBuffer buffer;
+ TBufferOutput output(buffer);
+ output << "1"
+ << "22"
+ << "333"
+ << "4444"
+ << "55555";
+
+ UNIT_ASSERT(0 == memcmp(buffer.data(), "1"
+ "22"
+ "333"
+ "4444"
+ "55555",
+ buffer.size()));
+ }
+
+ Y_UNIT_TEST(WriteChars) {
+ TBuffer buffer;
+ TBufferOutput output(buffer);
+ output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0';
+
+ UNIT_ASSERT(0 == memcmp(buffer.data(), "1234567890", buffer.size()));
+ }
+}
diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp
new file mode 100644
index 0000000000..a00e592e1c
--- /dev/null
+++ b/util/stream/buffered.cpp
@@ -0,0 +1,428 @@
+#include "mem.h"
+#include "buffered.h"
+
+#include <util/memory/addstorage.h>
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
+
+class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> {
+public:
+ inline TImpl(IInputStream* slave)
+ : Slave_(slave)
+ , MemInput_(nullptr, 0)
+ {
+ }
+
+ inline ~TImpl() = default;
+
+ inline size_t Next(const void** ptr, size_t len) {
+ if (MemInput_.Exhausted()) {
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Next(ptr, len);
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ if (MemInput_.Exhausted()) {
+ if (len > BufLen() / 2) {
+ return Slave_->Read(buf, len);
+ }
+
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Read(buf, len);
+ }
+
+ inline size_t Skip(size_t len) {
+ size_t totalSkipped = 0;
+ while (len) {
+ const size_t skipped = DoSkip(len);
+ if (skipped == 0) {
+ break;
+ }
+
+ totalSkipped += skipped;
+ len -= skipped;
+ }
+
+ return totalSkipped;
+ }
+
+ inline size_t DoSkip(size_t len) {
+ if (MemInput_.Exhausted()) {
+ if (len > BufLen() / 2) {
+ return Slave_->Skip(len);
+ }
+
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Skip(len);
+ }
+
+ inline size_t ReadTo(TString& st, char to) {
+ st.clear();
+
+ TString s_tmp;
+
+ size_t ret = 0;
+
+ while (true) {
+ if (MemInput_.Exhausted()) {
+ const size_t bytesRead = Slave_->Read(Buf(), BufLen());
+
+ if (!bytesRead) {
+ break;
+ }
+
+ MemInput_.Reset(Buf(), bytesRead);
+ }
+
+ const size_t a_len(MemInput_.Avail());
+ size_t s_len = 0;
+ if (st.empty()) {
+ ret += MemInput_.ReadTo(st, to);
+ s_len = st.length();
+ } else {
+ ret += MemInput_.ReadTo(s_tmp, to);
+ s_len = s_tmp.length();
+ st.append(s_tmp);
+ }
+
+ if (s_len != a_len) {
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ inline void Reset(IInputStream* slave) {
+ Slave_ = slave;
+ }
+
+private:
+ inline size_t BufLen() const noexcept {
+ return AdditionalDataLength();
+ }
+
+ inline void* Buf() const noexcept {
+ return AdditionalData();
+ }
+
+private:
+ IInputStream* Slave_;
+ TMemoryInput MemInput_;
+};
+
+TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen)
+ : Impl_(new (buflen) TImpl(slave))
+{
+}
+
+TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default;
+TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default;
+
+TBufferedInput::~TBufferedInput() = default;
+
+size_t TBufferedInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
+size_t TBufferedInput::DoSkip(size_t len) {
+ return Impl_->Skip(len);
+}
+
+size_t TBufferedInput::DoNext(const void** ptr, size_t len) {
+ return Impl_->Next(ptr, len);
+}
+
+size_t TBufferedInput::DoReadTo(TString& st, char ch) {
+ return Impl_->ReadTo(st, ch);
+}
+
+void TBufferedInput::Reset(IInputStream* slave) {
+ Impl_->Reset(slave);
+}
+
+class TBufferedOutputBase::TImpl {
+public:
+ inline TImpl(IOutputStream* slave)
+ : Slave_(slave)
+ , MemOut_(nullptr, 0)
+ , PropagateFlush_(false)
+ , PropagateFinish_(false)
+ {
+ }
+
+ virtual ~TImpl() = default;
+
+ inline void Reset() {
+ MemOut_.Reset(Buf(), Len());
+ }
+
+ inline size_t Next(void** ptr) {
+ if (MemOut_.Avail() == 0) {
+ Slave_->Write(Buf(), Stored());
+ OnBufferExhausted();
+ Reset();
+ }
+
+ return MemOut_.Next(ptr);
+ }
+
+ inline void Undo(size_t len) {
+ Y_VERIFY(len <= Stored(), "trying to undo more bytes than actually written");
+ MemOut_.Undo(len);
+ }
+
+ inline void Write(const void* buf, size_t len) {
+ if (len <= MemOut_.Avail()) {
+ /*
+ * fast path
+ */
+
+ MemOut_.Write(buf, len);
+ } else {
+ const size_t stored = Stored();
+ const size_t full_len = stored + len;
+ const size_t good_len = DownToBufferGranularity(full_len);
+ const size_t write_from_buf = good_len - stored;
+
+ using TPart = IOutputStream::TPart;
+
+ alignas(TPart) char data[2 * sizeof(TPart)];
+ TPart* parts = reinterpret_cast<TPart*>(data);
+ TPart* end = parts;
+
+ if (stored) {
+ new (end++) TPart(Buf(), stored);
+ }
+
+ if (write_from_buf) {
+ new (end++) TPart(buf, write_from_buf);
+ }
+
+ Slave_->Write(parts, end - parts);
+
+ //grow buffer only on full flushes
+ OnBufferExhausted();
+ Reset();
+
+ if (write_from_buf < len) {
+ MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf);
+ }
+ }
+ }
+
+ inline void Write(char c) {
+ if (Y_UNLIKELY(MemOut_.Avail() == 0)) {
+ Slave_->Write(Buf(), Stored());
+ OnBufferExhausted();
+ Reset();
+ }
+
+ MemOut_.Write(c);
+ }
+
+ inline void SetFlushPropagateMode(bool mode) noexcept {
+ PropagateFlush_ = mode;
+ }
+
+ inline void SetFinishPropagateMode(bool mode) noexcept {
+ PropagateFinish_ = mode;
+ }
+
+ inline void Flush() {
+ {
+ Slave_->Write(Buf(), Stored());
+ Reset();
+ }
+
+ if (PropagateFlush_) {
+ Slave_->Flush();
+ }
+ }
+
+ inline void Finish() {
+ try {
+ Flush();
+ } catch (...) {
+ try {
+ DoFinish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ throw;
+ }
+
+ DoFinish();
+ }
+
+private:
+ inline void DoFinish() {
+ if (PropagateFinish_) {
+ Slave_->Finish();
+ }
+ }
+
+ inline size_t Stored() const noexcept {
+ return Len() - MemOut_.Avail();
+ }
+
+ inline size_t DownToBufferGranularity(size_t l) const noexcept {
+ return l - (l % Len());
+ }
+
+ virtual void OnBufferExhausted() = 0;
+ virtual void* Buf() const noexcept = 0;
+ virtual size_t Len() const noexcept = 0;
+
+private:
+ IOutputStream* Slave_;
+ TMemoryOutput MemOut_;
+ bool PropagateFlush_;
+ bool PropagateFinish_;
+};
+
+namespace {
+ struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> {
+ inline TSimpleImpl(IOutputStream* slave)
+ : TBufferedOutputBase::TImpl(slave)
+ {
+ Reset();
+ }
+
+ ~TSimpleImpl() override = default;
+
+ void OnBufferExhausted() final {
+ }
+
+ void* Buf() const noexcept override {
+ return AdditionalData();
+ }
+
+ size_t Len() const noexcept override {
+ return AdditionalDataLength();
+ }
+ };
+
+ struct TAdaptiveImpl: public TBufferedOutputBase::TImpl {
+ enum {
+ Step = 4096
+ };
+
+ inline TAdaptiveImpl(IOutputStream* slave)
+ : TBufferedOutputBase::TImpl(slave)
+ , N_(0)
+ {
+ B_.Reserve(Step);
+ Reset();
+ }
+
+ ~TAdaptiveImpl() override = default;
+
+ void OnBufferExhausted() final {
+ const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10);
+
+ if (c > B_.Capacity()) {
+ TBuffer(c).Swap(B_);
+ }
+ }
+
+ void* Buf() const noexcept override {
+ return (void*)B_.Data();
+ }
+
+ size_t Len() const noexcept override {
+ return B_.Capacity();
+ }
+
+ TBuffer B_;
+ ui64 N_;
+ };
+}
+
+TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave)
+ : Impl_(new TAdaptiveImpl(slave))
+{
+}
+
+TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen)
+ : Impl_(new (buflen) TSimpleImpl(slave))
+{
+}
+
+TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default;
+TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default;
+
+TBufferedOutputBase::~TBufferedOutputBase() {
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
+size_t TBufferedOutputBase::DoNext(void** ptr) {
+ Y_ENSURE(Impl_.Get(), "cannot call next in finished stream");
+ return Impl_->Next(ptr);
+}
+
+void TBufferedOutputBase::DoUndo(size_t len) {
+ Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream");
+ Impl_->Undo(len);
+}
+
+void TBufferedOutputBase::DoWrite(const void* data, size_t len) {
+ Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
+ Impl_->Write(data, len);
+}
+
+void TBufferedOutputBase::DoWriteC(char c) {
+ Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
+ Impl_->Write(c);
+}
+
+void TBufferedOutputBase::DoFlush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TBufferedOutputBase::DoFinish() {
+ THolder<TImpl> impl(Impl_.Release());
+
+ if (impl) {
+ impl->Finish();
+ }
+}
+
+void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept {
+ if (Impl_.Get()) {
+ Impl_->SetFlushPropagateMode(propagate);
+ }
+}
+
+void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept {
+ if (Impl_.Get()) {
+ Impl_->SetFinishPropagateMode(propagate);
+ }
+}
+
+TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen)
+ : TBufferedOutputBase(slave, buflen)
+{
+}
+
+TBufferedOutput::~TBufferedOutput() = default;
+
+TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave)
+ : TBufferedOutputBase(slave)
+{
+}
+
+TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default;
diff --git a/util/stream/buffered.h b/util/stream/buffered.h
new file mode 100644
index 0000000000..0847186141
--- /dev/null
+++ b/util/stream/buffered.h
@@ -0,0 +1,235 @@
+#pragma once
+
+#include "zerocopy.h"
+#include "zerocopy_output.h"
+
+#include <utility>
+#include <util/generic/ptr.h>
+#include <util/generic/typetraits.h>
+#include <util/generic/store_policy.h>
+
+/**
+ * @addtogroup Streams_Buffered
+ * @{
+ */
+
+/**
+ * Input stream that wraps the given stream and adds a buffer on top of it,
+ * thus making sure that data is read from the underlying stream in big chunks.
+ *
+ * Note that it does not claim ownership of the underlying stream, so it's up
+ * to the user to free it.
+ */
+class TBufferedInput: public IZeroCopyInput {
+public:
+ TBufferedInput(IInputStream* slave, size_t buflen = 8192);
+
+ TBufferedInput(TBufferedInput&&) noexcept;
+ TBufferedInput& operator=(TBufferedInput&&) noexcept;
+
+ ~TBufferedInput() override;
+
+ /**
+ * Switches the underlying stream to the one provided. Does not clear the
+ * data that was already buffered.
+ *
+ * @param slave New underlying stream.
+ */
+ void Reset(IInputStream* slave);
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoReadTo(TString& st, char ch) override;
+ size_t DoSkip(size_t len) override;
+ size_t DoNext(const void** ptr, size_t len) override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Output stream that wraps the given stream and adds a buffer on top of it,
+ * thus making sure that data is written to the underlying stream in big chunks.
+ *
+ * Note that by default this stream does not propagate `Flush` and `Finish`
+ * calls to the underlying stream, instead simply flushing out the buffer.
+ * You can change this behavior by using propagation mode setters.
+ *
+ * Also note that this stream does not claim ownership of the underlying stream,
+ * so it's up to the user to free it.
+ */
+class TBufferedOutputBase: public IZeroCopyOutput {
+public:
+ /**
+ * Constructs a buffered stream that dynamically adjusts the size of the
+ * buffer. This works best when the amount of data that will be passed
+ * through this stream is not known and can range in size from several
+ * kilobytes to several gigabytes.
+ *
+ * @param slave Underlying stream.
+ */
+ TBufferedOutputBase(IOutputStream* slave);
+
+ /**
+ * Constructs a buffered stream with the given size of the buffer.
+ *
+ * @param slave Underlying stream.
+ * @param buflen Size of the buffer.
+ */
+ TBufferedOutputBase(IOutputStream* slave, size_t buflen);
+
+ TBufferedOutputBase(TBufferedOutputBase&&) noexcept;
+ TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept;
+
+ ~TBufferedOutputBase() override;
+
+ /**
+ * @param propagate Whether `Flush` and `Finish` calls should
+ * be propagated to the underlying stream.
+ * By default they are not.
+ */
+ inline void SetPropagateMode(bool propagate) noexcept {
+ SetFlushPropagateMode(propagate);
+ SetFinishPropagateMode(propagate);
+ }
+
+ /**
+ * @param propagate Whether `Flush` calls should be propagated
+ * to the underlying stream. By default they
+ * are not.
+ */
+ void SetFlushPropagateMode(bool propagate) noexcept;
+
+ /**
+ * @param propagate Whether `Finish` calls should be propagated
+ * to the underlying stream. By default they
+ * are not.
+ */
+ void SetFinishPropagateMode(bool propagate) noexcept;
+
+ class TImpl;
+
+protected:
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t len) override;
+ void DoWrite(const void* data, size_t len) override;
+ void DoWriteC(char c) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+private:
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Buffered output stream with a fixed-size buffer.
+ *
+ * @see TBufferedOutputBase
+ */
+class TBufferedOutput: public TBufferedOutputBase {
+public:
+ TBufferedOutput(IOutputStream* slave, size_t buflen = 8192);
+ ~TBufferedOutput() override;
+
+ TBufferedOutput(TBufferedOutput&&) noexcept = default;
+ TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default;
+};
+
+/**
+ * Buffered output stream that dynamically adjusts the size of the buffer based
+ * on the amount of data that's passed through it.
+ *
+ * @see TBufferedOutputBase
+ */
+class TAdaptiveBufferedOutput: public TBufferedOutputBase {
+public:
+ TAdaptiveBufferedOutput(IOutputStream* slave);
+ ~TAdaptiveBufferedOutput() override;
+
+ TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default;
+ TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default;
+};
+
+namespace NPrivate {
+ struct TMyBufferedOutput: public TBufferedOutput {
+ inline TMyBufferedOutput(IOutputStream* slave, size_t buflen)
+ : TBufferedOutput(slave, buflen)
+ {
+ SetFinishPropagateMode(true);
+ }
+ };
+
+ template <class T>
+ struct TBufferedStreamFor {
+ using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>;
+ };
+}
+
+/**
+ * A mixin class that turns unbuffered stream into a buffered one.
+ *
+ * Note that using this mixin with a stream that is already buffered won't
+ * result in double buffering, e.g. `TBuffered<TBuffered<TUnbufferedFileInput>>` and
+ * `TBuffered<TUnbufferedFileInput>` are basically the same types.
+ *
+ * Example usage:
+ * @code
+ * TBuffered<TUnbufferedFileInput> file_input(1024, "/path/to/file");
+ * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file");
+ * @endcode
+ * Here 1024 is the size of the buffer.
+ */
+template <class TSlave>
+class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult {
+ using TSlaveBase = TEmbedPolicy<TSlave>;
+ using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult;
+
+public:
+ template <typename... Args>
+ inline TBuffered(size_t b, Args&&... args)
+ : TSlaveBase(std::forward<Args>(args)...)
+ , TBufferedBase(TSlaveBase::Ptr(), b)
+ {
+ }
+
+ inline TSlave& Slave() noexcept {
+ return *this->Ptr();
+ }
+
+ TBuffered(const TBuffered&) = delete;
+ TBuffered& operator=(const TBuffered&) = delete;
+ TBuffered(TBuffered&&) = delete;
+ TBuffered& operator=(TBuffered&&) = delete;
+};
+
+/**
+ * A mixin class that turns unbuffered stream into an adaptively buffered one.
+ * Created stream differs from the one created via `TBuffered` template in that
+ * it dynamically adjusts the size of the buffer based on the amount of data
+ * that's passed through it.
+ *
+ * Example usage:
+ * @code
+ * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file");
+ * @endcode
+ */
+template <class TSlave>
+class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput {
+ using TSlaveBase = TEmbedPolicy<TSlave>;
+
+public:
+ template <typename... Args>
+ inline TAdaptivelyBuffered(Args&&... args)
+ : TSlaveBase(std::forward<Args>(args)...)
+ , TAdaptiveBufferedOutput(TSlaveBase::Ptr())
+ {
+ }
+
+ TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete;
+ TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete;
+ TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete;
+ TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete;
+};
+
+/** @} */
diff --git a/util/stream/buffered_ut.cpp b/util/stream/buffered_ut.cpp
new file mode 100644
index 0000000000..41d2fc3030
--- /dev/null
+++ b/util/stream/buffered_ut.cpp
@@ -0,0 +1,142 @@
+#include "buffered.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/string.h>
+#include <util/random/mersenne.h>
+
+Y_UNIT_TEST_SUITE(TestBufferedIO) {
+ template <class TOut>
+ inline void Run(TOut&& out) {
+ TMersenne<ui64> r;
+
+ for (size_t i = 0; i < 1000; ++i) {
+ const size_t c = r.GenRand() % 10000;
+ TString s;
+
+ for (size_t j = 0; j < c; ++j) {
+ s.append('A' + (r.GenRand() % 10));
+ }
+
+ out.Write(s.data(), s.size());
+ }
+ }
+
+ Y_UNIT_TEST(TestEqual) {
+ TString s1;
+ TString s2;
+
+ Run(TBuffered<TStringOutput>(8000, s1));
+ Run(TAdaptivelyBuffered<TStringOutput>(s2));
+
+ UNIT_ASSERT_VALUES_EQUAL(s1, s2);
+ }
+
+ Y_UNIT_TEST(Test1) {
+ TString s;
+
+ TBuffered<TStringOutput>(100, s).Write("1", 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "1");
+ }
+
+ Y_UNIT_TEST(Test2) {
+ TString s;
+
+ TBuffered<TStringOutput>(1, s).Write("12", 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "12");
+ }
+
+ Y_UNIT_TEST(Test3) {
+ TString s;
+
+ auto&& b = TBuffered<TStringOutput>(1, s);
+
+ b.Write("1", 1);
+ b.Write("12", 2);
+ b.Finish();
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "112");
+ }
+
+ Y_UNIT_TEST(Test4) {
+ TString s;
+
+ auto&& b = TBuffered<TStringOutput>(1, s);
+
+ b.Write('1');
+ b.Write('2');
+ b.Write('3');
+ b.Finish();
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "123");
+ }
+
+ template <class TOut>
+ inline void DoGenAndWrite(TOut&& output, TString& str) {
+ TMersenne<ui64> r;
+ for (size_t i = 0; i < 43210; ++i) {
+ str.append('A' + (r.GenRand() % 10));
+ }
+ size_t written = 0;
+ void* ptr = nullptr;
+ while (written < str.size()) {
+ size_t bufferSize = output.Next(&ptr);
+ UNIT_ASSERT(ptr && bufferSize > 0);
+ size_t toWrite = Min(bufferSize, str.size() - written);
+ memcpy(ptr, str.begin() + written, toWrite);
+ written += toWrite;
+ if (toWrite < bufferSize) {
+ output.Undo(bufferSize - toWrite);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestWriteViaNextAndUndo) {
+ TString str1, str2;
+ DoGenAndWrite(TBuffered<TStringOutput>(5000, str1), str2);
+
+ UNIT_ASSERT_STRINGS_EQUAL(str1, str2);
+ }
+
+ Y_UNIT_TEST(TestWriteViaNextAndUndoAdaptive) {
+ TString str1, str2;
+ DoGenAndWrite(TAdaptivelyBuffered<TStringOutput>(str1), str2);
+
+ UNIT_ASSERT_STRINGS_EQUAL(str1, str2);
+ }
+
+ Y_UNIT_TEST(TestInput) {
+ TString s("0123456789abcdefghijklmn");
+ TBuffered<TStringInput> in(5, s);
+ char c;
+ UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //1
+ UNIT_ASSERT_VALUES_EQUAL(c, '0');
+ UNIT_ASSERT_VALUES_EQUAL(in.Skip(4), 4); //5 end of buffer
+ UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //6
+ UNIT_ASSERT_VALUES_EQUAL(c, '5');
+ UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //9
+ UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //10 end of buffer
+ UNIT_ASSERT_VALUES_EQUAL(c, '9');
+ UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //13
+ UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //14 start new buffer
+ UNIT_ASSERT_VALUES_EQUAL(c, 'd');
+ UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 6); //20
+ UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //21 start new buffer
+ UNIT_ASSERT_VALUES_EQUAL(c, 'k');
+ UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 3); //24 eof
+ }
+
+ Y_UNIT_TEST(TestReadTo) {
+ TString s("0123456789abc");
+ TBuffered<TStringInput> in(2, s);
+ TString t;
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8);
+ UNIT_ASSERT_VALUES_EQUAL(t, "0123456");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '8'), 1);
+ UNIT_ASSERT_VALUES_EQUAL(t, "");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 4);
+ UNIT_ASSERT_VALUES_EQUAL(t, "9abc");
+ }
+}
diff --git a/util/stream/debug.cpp b/util/stream/debug.cpp
new file mode 100644
index 0000000000..afd5b3e1c7
--- /dev/null
+++ b/util/stream/debug.cpp
@@ -0,0 +1,50 @@
+#include "null.h"
+#include "debug.h"
+
+#include <util/string/cast.h>
+#include <util/generic/singleton.h>
+#include <util/generic/yexception.h>
+
+#include <cstdio>
+#include <cstdlib>
+
+void TDebugOutput::DoWrite(const void* buf, size_t len) {
+ if (len != fwrite(buf, 1, len, stderr)) {
+ ythrow yexception() << "write failed(" << LastSystemErrorText() << ")";
+ }
+}
+
+namespace {
+ struct TDbgSelector {
+ inline TDbgSelector() {
+ char* dbg = getenv("DBGOUT");
+ if (dbg) {
+ Out = &Cerr;
+ try {
+ Level = FromString(dbg);
+ } catch (const yexception&) {
+ Level = 0;
+ }
+ } else {
+ Out = &Cnull;
+ Level = 0;
+ }
+ }
+
+ IOutputStream* Out;
+ int Level;
+ };
+}
+
+template <>
+struct TSingletonTraits<TDbgSelector> {
+ static constexpr size_t Priority = 8;
+};
+
+IOutputStream& StdDbgStream() noexcept {
+ return *(Singleton<TDbgSelector>()->Out);
+}
+
+int StdDbgLevel() noexcept {
+ return Singleton<TDbgSelector>()->Level;
+}
diff --git a/util/stream/debug.h b/util/stream/debug.h
new file mode 100644
index 0000000000..92d6d4b42d
--- /dev/null
+++ b/util/stream/debug.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include "output.h"
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Debug output stream. Writes into `stderr`.
+ */
+class TDebugOutput: public IOutputStream {
+public:
+ inline TDebugOutput() noexcept = default;
+ ~TDebugOutput() override = default;
+
+ TDebugOutput(TDebugOutput&&) noexcept = default;
+ TDebugOutput& operator=(TDebugOutput&&) noexcept = default;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+/**
+ * @returns Standard debug stream.
+ * @see Cdbg
+ */
+IOutputStream& StdDbgStream() noexcept;
+
+/**
+ * This function returns the current debug level as set via `DBGOUT` environment
+ * variable.
+ *
+ * Note that the proper way to use this function is via `Y_DBGTRACE` macro.
+ * There are very few cases when there is a need to use it directly.
+ *
+ * @returns Debug level.
+ * @see ETraceLevel
+ * @see DBGTRACE
+ */
+int StdDbgLevel() noexcept;
+
+/**
+ * Standard debug stream.
+ *
+ * Behavior of this stream is controlled via `DBGOUT` environment variable.
+ * If this variable is set, then this stream is redirected into `stderr`,
+ * otherwise whatever is written into it is simply ignored.
+ */
+#define Cdbg (StdDbgStream())
+
+/** @} */
diff --git a/util/stream/direct_io.cpp b/util/stream/direct_io.cpp
new file mode 100644
index 0000000000..649033af34
--- /dev/null
+++ b/util/stream/direct_io.cpp
@@ -0,0 +1,47 @@
+#include "direct_io.h"
+
+#include <util/generic/utility.h>
+
+size_t TRandomAccessFileInput::DoRead(void* buf, size_t len) {
+ const size_t result = File.Pread(buf, len, Position);
+ Position += result;
+ return result;
+}
+
+TRandomAccessFileInput::TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position)
+ : File(file)
+ , Position(position)
+{
+}
+
+size_t TRandomAccessFileInput::DoSkip(size_t len) {
+ size_t skiped = Min(len, (size_t)Min((ui64)Max<size_t>(), File.GetLength() - Position));
+ Position += skiped;
+ return skiped;
+}
+
+TRandomAccessFileOutput::TRandomAccessFileOutput(TDirectIOBufferedFile& file)
+ : File(&file)
+{
+}
+
+void TRandomAccessFileOutput::DoWrite(const void* buf, size_t len) {
+ File->Write(buf, len);
+}
+
+void TRandomAccessFileOutput::DoFlush() {
+ File->FlushData();
+}
+
+TBufferedFileOutputEx::TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen)
+ : TRandomAccessFileOutput(*(new TDirectIOBufferedFile(path, oMode, buflen)))
+ , FileHolder(File)
+{
+}
+
+void TBufferedFileOutputEx::DoFinish() {
+ FileHolder->Finish();
+}
+
+void TBufferedFileOutputEx::DoFlush() {
+}
diff --git a/util/stream/direct_io.h b/util/stream/direct_io.h
new file mode 100644
index 0000000000..2e1f2e07dd
--- /dev/null
+++ b/util/stream/direct_io.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include "input.h"
+#include "output.h"
+#include <util/system/direct_io.h>
+
+class TRandomAccessFileInput: public IInputStream {
+public:
+ TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position);
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ TDirectIOBufferedFile& File;
+ ui64 Position;
+};
+
+class TRandomAccessFileOutput: public IOutputStream {
+public:
+ TRandomAccessFileOutput(TDirectIOBufferedFile& file);
+
+ TRandomAccessFileOutput(TRandomAccessFileOutput&&) noexcept = default;
+ TRandomAccessFileOutput& operator=(TRandomAccessFileOutput&&) noexcept = default;
+
+protected:
+ TDirectIOBufferedFile* File;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+};
+
+class TBufferedFileOutputEx: public TRandomAccessFileOutput {
+public:
+ TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen = 1 << 17);
+
+private:
+ void DoFlush() override;
+ void DoFinish() override;
+ THolder<TDirectIOBufferedFile> FileHolder;
+};
diff --git a/util/stream/direct_io_ut.cpp b/util/stream/direct_io_ut.cpp
new file mode 100644
index 0000000000..01d09db232
--- /dev/null
+++ b/util/stream/direct_io_ut.cpp
@@ -0,0 +1,71 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/string.h>
+#include <util/generic/array_size.h>
+#include <util/system/env.h>
+
+#include "buffered.h"
+#include "direct_io.h"
+
+Y_UNIT_TEST_SUITE(TDirectIOTests) {
+ // Decrease numBufToWrite further if tests continue to time out
+ static void Y_NO_INLINE Test(EOpenMode mode, size_t numBufToWrite) {
+ const char TEMPLATE[] = "qwertyuiopQWERTYUIOPasdfghjklASD";
+ const auto TEMPLATE_SIZE = Y_ARRAY_SIZE(TEMPLATE) - 1;
+ static_assert(TEMPLATE_SIZE > 0, "must be greater than zero");
+
+ const size_t BUFFER_SIZE = 32 * 1024;
+ static_assert(0 == BUFFER_SIZE % TEMPLATE_SIZE, "must be divisible");
+
+ const size_t CHUNK_SIZE_TO_READ = 512;
+ static_assert(0 == CHUNK_SIZE_TO_READ % TEMPLATE_SIZE, "must be divisible");
+
+ // filling buffer
+ // TEMPLATE|TEMPLATE|TEMPLATE|...
+ auto&& buffer = TBuffer{BUFFER_SIZE};
+ for (size_t i = 0; i < BUFFER_SIZE / TEMPLATE_SIZE; ++i) {
+ buffer.Append(TEMPLATE, TEMPLATE_SIZE);
+ }
+
+ // filling file
+ // TEMPLATE|TEMPLATE|TEMPLATE|...
+ const auto fileName = TString("test.file");
+ auto&& directIOBuffer = TDirectIOBufferedFile{fileName, RdWr | CreateAlways | mode};
+ {
+ auto&& output = TRandomAccessFileOutput{directIOBuffer};
+ for (size_t i = 0; i < numBufToWrite; ++i) {
+ output.Write(buffer.Data(), BUFFER_SIZE);
+ }
+ }
+
+ auto&& reader = TRandomAccessFileInput{directIOBuffer, 0};
+ auto&& input = TBufferedInput{&reader, 1 << 17};
+ auto bytesRead = size_t{};
+ while (auto len = input.Read(buffer.Data(), CHUNK_SIZE_TO_READ)) {
+ bytesRead += len;
+ while (len) {
+ if (len < TEMPLATE_SIZE) {
+ UNIT_ASSERT(!memcmp(buffer.Data(), TEMPLATE, len));
+ len = 0;
+ } else {
+ UNIT_ASSERT(!memcmp(buffer.Data(), TEMPLATE, TEMPLATE_SIZE));
+ len -= TEMPLATE_SIZE;
+ }
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(bytesRead, numBufToWrite * BUFFER_SIZE);
+ }
+
+ Y_UNIT_TEST(ReadWriteTest) {
+ Test(0, 100 * 32);
+ }
+
+ Y_UNIT_TEST(ReadWriteDirectTest) {
+ Test(Direct, 100 * 4);
+ }
+
+ Y_UNIT_TEST(ReadWriteDirectSeqTest) {
+ Test(Direct | Seq, 100 * 4);
+ }
+}
diff --git a/util/stream/file.cpp b/util/stream/file.cpp
new file mode 100644
index 0000000000..dc5d2f6311
--- /dev/null
+++ b/util/stream/file.cpp
@@ -0,0 +1,97 @@
+#include "file.h"
+
+#include <util/memory/blob.h>
+#include <util/generic/yexception.h>
+
+TUnbufferedFileInput::TUnbufferedFileInput(const TString& path)
+ : File_(path, OpenExisting | RdOnly | Seq)
+{
+ if (!File_.IsOpen()) {
+ ythrow TIoException() << "file " << path << " not open";
+ }
+}
+
+TUnbufferedFileInput::TUnbufferedFileInput(const TFile& file)
+ : File_(file)
+{
+ if (!File_.IsOpen()) {
+ ythrow TIoException() << "file (" << file.GetName() << ") not open";
+ }
+}
+
+size_t TUnbufferedFileInput::DoRead(void* buf, size_t len) {
+ return File_.ReadOrFail(buf, len);
+}
+
+size_t TUnbufferedFileInput::DoSkip(size_t len) {
+ if (len < 384) {
+ /* Base implementation calls DoRead, which results in one system call
+ * instead of three as in fair skip implementation. For small sizes
+ * actually doing one read is cheaper. Experiments show that the
+ * border that separates two implementations performance-wise lies
+ * in the range of 384-512 bytes (assuming that the file is in OS cache). */
+ return IInputStream::DoSkip(len);
+ }
+
+ /* TFile::Seek can seek beyond the end of file, so we need to do
+ * size check here. */
+ i64 size = File_.GetLength();
+ i64 oldPos = File_.GetPosition();
+ i64 newPos = File_.Seek(Min<i64>(size, oldPos + len), sSet);
+
+ return newPos - oldPos;
+}
+
+TUnbufferedFileOutput::TUnbufferedFileOutput(const TString& path)
+ : File_(path, CreateAlways | WrOnly | Seq)
+{
+ if (!File_.IsOpen()) {
+ ythrow TFileError() << "can not open " << path;
+ }
+}
+
+TUnbufferedFileOutput::TUnbufferedFileOutput(const TFile& file)
+ : File_(file)
+{
+ if (!File_.IsOpen()) {
+ ythrow TIoException() << "closed file(" << file.GetName() << ") passed";
+ }
+}
+
+TUnbufferedFileOutput::~TUnbufferedFileOutput() = default;
+
+void TUnbufferedFileOutput::DoWrite(const void* buf, size_t len) {
+ File_.Write(buf, len);
+}
+
+void TUnbufferedFileOutput::DoFlush() {
+ if (File_.IsOpen()) {
+ File_.Flush();
+ }
+}
+
+class TMappedFileInput::TImpl: public TBlob {
+public:
+ inline TImpl(TFile file)
+ : TBlob(TBlob::FromFile(file))
+ {
+ }
+
+ inline ~TImpl() = default;
+};
+
+TMappedFileInput::TMappedFileInput(const TFile& file)
+ : TMemoryInput(nullptr, 0)
+ , Impl_(new TImpl(file))
+{
+ Reset(Impl_->Data(), Impl_->Size());
+}
+
+TMappedFileInput::TMappedFileInput(const TString& path)
+ : TMemoryInput(nullptr, 0)
+ , Impl_(new TImpl(TFile(path, OpenExisting | RdOnly)))
+{
+ Reset(Impl_->Data(), Impl_->Size());
+}
+
+TMappedFileInput::~TMappedFileInput() = default;
diff --git a/util/stream/file.h b/util/stream/file.h
new file mode 100644
index 0000000000..c1cf4f591d
--- /dev/null
+++ b/util/stream/file.h
@@ -0,0 +1,108 @@
+#pragma once
+
+#include "fwd.h"
+#include "input.h"
+#include "output.h"
+#include "buffered.h"
+#include "mem.h"
+
+#include <util/system/file.h>
+#include <utility>
+
+/**
+ * @addtogroup Streams_Files
+ * @{
+ */
+
+/**
+ * Unbuffered file input stream.
+ *
+ * Note that the input is not buffered, which means that `ReadLine` calls will
+ * be _very_ slow.
+ */
+class TUnbufferedFileInput: public IInputStream {
+public:
+ TUnbufferedFileInput(const TFile& file);
+ TUnbufferedFileInput(const TString& path);
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ TFile File_;
+};
+
+/**
+ * Memory-mapped file input stream.
+ */
+class TMappedFileInput: public TMemoryInput {
+public:
+ TMappedFileInput(const TFile& file);
+ TMappedFileInput(const TString& path);
+ ~TMappedFileInput() override;
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * File output stream.
+ *
+ * Note that the output is unbuffered, thus writing in many small chunks is
+ * likely to be quite slow.
+ */
+class TUnbufferedFileOutput: public IOutputStream {
+public:
+ TUnbufferedFileOutput(const TString& path);
+ TUnbufferedFileOutput(const TFile& file);
+ ~TUnbufferedFileOutput() override;
+
+ TUnbufferedFileOutput(TUnbufferedFileOutput&&) noexcept = default;
+ TUnbufferedFileOutput& operator=(TUnbufferedFileOutput&&) noexcept = default;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+
+private:
+ TFile File_;
+};
+
+/**
+ * Buffered file input stream.
+ *
+ * @see TBuffered
+ */
+class TFileInput: public TBuffered<TUnbufferedFileInput> {
+public:
+ template <class T>
+ inline TFileInput(T&& t, size_t buf = 1 << 13)
+ : TBuffered<TUnbufferedFileInput>(buf, std::forward<T>(t))
+ {
+ }
+
+ ~TFileInput() override = default;
+};
+
+/**
+ * Buffered file output stream.
+ *
+ * Currently deprecated, please use TFileOutput in new code.
+ *
+ * @deprecated
+ * @see TBuffered
+ */
+class TFixedBufferFileOutput: public TBuffered<TUnbufferedFileOutput> {
+public:
+ template <class T>
+ inline TFixedBufferFileOutput(T&& t, size_t buf = 1 << 13)
+ : TBuffered<TUnbufferedFileOutput>(buf, std::forward<T>(t))
+ {
+ }
+
+ ~TFixedBufferFileOutput() override = default;
+};
+
+/** @} */
diff --git a/util/stream/file_ut.cpp b/util/stream/file_ut.cpp
new file mode 100644
index 0000000000..ac0f09796e
--- /dev/null
+++ b/util/stream/file_ut.cpp
@@ -0,0 +1,74 @@
+#include "file.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/tempfile.h>
+
+static const char* TmpFileName = "./fileio";
+static const char* TmpFileContents = "To do good to Mankind is the chivalrous plan";
+static const char* TmpFileSubstring = strstr(TmpFileContents, "chivalrous");
+
+Y_UNIT_TEST_SUITE(TFileTest) {
+ Y_UNIT_TEST(InputTest) {
+ TTempFile tmp(TmpFileName);
+
+ {
+ TUnbufferedFileOutput output(TmpFileName);
+ output.Write(TmpFileContents, strlen(TmpFileContents));
+ }
+
+ {
+ TUnbufferedFileInput input(TmpFileName);
+ TString s = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL(s, TmpFileContents);
+ }
+
+ {
+ TUnbufferedFileInput input(TmpFileName);
+ input.Skip(TmpFileSubstring - TmpFileContents);
+ TString s = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL(s, "chivalrous plan");
+ }
+
+ {
+ TUnbufferedFileOutput output(TFile::ForAppend(TmpFileName));
+ output.Write(TmpFileContents, strlen(TmpFileContents));
+ }
+
+ {
+ TUnbufferedFileInput input(TmpFileName);
+ TString s = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL(s, TString::Join(TmpFileContents, TmpFileContents));
+ }
+ }
+
+ Y_UNIT_TEST(EmptyMapTest) {
+ TTempFile tmp(TmpFileName);
+
+ {
+ TUnbufferedFileOutput output(TmpFileName);
+ /* Write nothing. */
+ }
+
+ {
+ TMappedFileInput input(TmpFileName);
+ TString s = input.ReadAll();
+ UNIT_ASSERT(s.empty());
+ }
+ }
+
+#ifdef _unix_
+ Y_UNIT_TEST(PipeReadLineTest) {
+ int fds[2];
+ UNIT_ASSERT(pipe(fds) == 0);
+ TFile readEnd(fds[0]);
+ TFileInput fileInput(readEnd);
+ UNIT_ASSERT_VALUES_EQUAL(write(fds[1], "hello\n", 6), 6);
+
+ TString line;
+ UNIT_ASSERT(fileInput.ReadLine(line));
+ UNIT_ASSERT_STRINGS_EQUAL(line, "hello");
+ close(fds[1]);
+ }
+#endif
+}
diff --git a/util/stream/format.cpp b/util/stream/format.cpp
new file mode 100644
index 0000000000..3996130df5
--- /dev/null
+++ b/util/stream/format.cpp
@@ -0,0 +1,134 @@
+#include "format.h"
+#include "output.h"
+
+#include <util/generic/ymath.h>
+#include <util/string/cast.h>
+
+namespace NFormatPrivate {
+ static inline i64 Round(double value) {
+ double res1 = floor(value);
+ double res2 = ceil(value);
+ return (value - res1 < res2 - value) ? (i64)res1 : (i64)res2;
+ }
+
+ static inline IOutputStream& PrintDoubleShortly(IOutputStream& os, const double& d) {
+ // General case: request 3 significant digits
+ // Side-effect: allows exponential representation
+ EFloatToStringMode mode = PREC_NDIGITS;
+ int ndigits = 3;
+
+ if (IsValidFloat(d) && Abs(d) < 1e12) {
+ // For reasonably-sized finite values, it's better to avoid
+ // exponential representation.
+ // Use compact fixed representation and determine
+ // precision based on magnitude.
+ mode = PREC_POINT_DIGITS_STRIP_ZEROES;
+ if (i64(Abs(d) * 100) < 1000) {
+ ndigits = 2;
+ } else if (i64(Abs(d) * 10) < 1000) {
+ ndigits = 1;
+ } else {
+ ndigits = 0;
+ }
+ }
+
+ return os << Prec(d, mode, ndigits);
+ }
+}
+
+template <>
+void Out<NFormatPrivate::THumanReadableSize>(IOutputStream& stream, const NFormatPrivate::THumanReadableSize& value) {
+ ui64 base = value.Format == SF_BYTES ? 1024 : 1000;
+ ui64 base2 = base * base;
+ ui64 base3 = base * base2;
+ ui64 base4 = base * base3;
+
+ double v = value.Value;
+ if (v < 0) {
+ stream << "-";
+ v = -v;
+ }
+
+ if (v < base) {
+ NFormatPrivate::PrintDoubleShortly(stream, v);
+ } else if (v < base2) {
+ NFormatPrivate::PrintDoubleShortly(stream, v / (double)base) << 'K';
+ } else if (v < base3) {
+ NFormatPrivate::PrintDoubleShortly(stream, v / (double)base2) << 'M';
+ } else if (v < base4) {
+ NFormatPrivate::PrintDoubleShortly(stream, v / (double)base3) << 'G';
+ } else {
+ NFormatPrivate::PrintDoubleShortly(stream, v / (double)base4) << 'T';
+ }
+
+ if (value.Format == SF_BYTES) {
+ if (v < base) {
+ stream << "B";
+ } else {
+ stream << "iB";
+ }
+ }
+}
+
+template <>
+void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NFormatPrivate::THumanReadableDuration& hr) {
+ TTempBuf buf;
+ TMemoryOutput ss(buf.Data(), buf.Size());
+
+ do {
+ ui64 microSeconds = hr.Value.MicroSeconds();
+ if (microSeconds < 1000) {
+ ss << microSeconds << "us";
+ break;
+ }
+ if (microSeconds < 1000 * 1000) {
+ NFormatPrivate::PrintDoubleShortly(ss, (double)microSeconds / 1000.0) << "ms";
+ break;
+ }
+
+ double seconds = (double)(hr.Value.MilliSeconds()) / 1000.0;
+ if (seconds < 60) {
+ NFormatPrivate::PrintDoubleShortly(ss, seconds) << 's';
+ break;
+ }
+
+ ui64 s = NFormatPrivate::Round(seconds * 1000 + 0.5) / 1000;
+
+ ui64 m = s / 60;
+ s = s % 60;
+
+ ui64 h = m / 60;
+ m = m % 60;
+
+ ui64 d = h / 24;
+ h = h % 24;
+
+ ui64 times[] = {d, h, m, s};
+ char names[] = {'d', 'h', 'm', 's'};
+ bool first = true;
+
+ for (size_t i = 0; i < Y_ARRAY_SIZE(times); ++i) {
+ if (times[i] > 0) {
+ if (!first) {
+ ss << ' ';
+ }
+ ss << times[i] << names[i];
+ first = false;
+ }
+ }
+ } while (false);
+
+ size_t written = buf.Size() - ss.Avail();
+ os.Write(buf.Data(), written);
+}
+
+void Time(IOutputStream& l) {
+ l << millisec();
+}
+
+void TimeHumanReadable(IOutputStream& l) {
+ char timeStr[30];
+ const time_t t = time(nullptr);
+
+ l << ctime_r(&t, timeStr);
+}
diff --git a/util/stream/format.h b/util/stream/format.h
new file mode 100644
index 0000000000..b033208a1b
--- /dev/null
+++ b/util/stream/format.h
@@ -0,0 +1,444 @@
+#pragma once
+
+#include "mem.h"
+#include "output.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/flags.h>
+#include <util/memory/tempbuf.h>
+#include <util/string/cast.h>
+
+enum ENumberFormatFlag {
+ HF_FULL = 0x01, /**< Output number with leading zeros. */
+ HF_ADDX = 0x02, /**< Output '0x' or '0b' before hex/bin digits. */
+};
+Y_DECLARE_FLAGS(ENumberFormat, ENumberFormatFlag)
+Y_DECLARE_OPERATORS_FOR_FLAGS(ENumberFormat)
+
+enum ESizeFormat {
+ SF_QUANTITY, /**< Base 1000, usual suffixes. 1100 gets turned into "1.1K". */
+ SF_BYTES, /**< Base 1024, byte suffix. 1100 gets turned into "1.07KiB". */
+};
+
+namespace NFormatPrivate {
+ template <size_t Value>
+ struct TLog2: std::integral_constant<size_t, TLog2<Value / 2>::value + 1> {};
+
+ template <>
+ struct TLog2<1>: std::integral_constant<size_t, 0> {};
+
+ static inline void WriteChars(IOutputStream& os, char c, size_t count) {
+ if (count == 0)
+ return;
+ TTempBuf buf(count);
+ memset(buf.Data(), c, count);
+ os.Write(buf.Data(), count);
+ }
+
+ template <typename T>
+ struct TLeftPad {
+ T Value;
+ size_t Width;
+ char Padc;
+
+ inline TLeftPad(const T& value, size_t width, char padc)
+ : Value(value)
+ , Width(width)
+ , Padc(padc)
+ {
+ }
+ };
+
+ template <typename T>
+ IOutputStream& operator<<(IOutputStream& o, const TLeftPad<T>& lp) {
+ TTempBuf buf;
+ TMemoryOutput ss(buf.Data(), buf.Size());
+ ss << lp.Value;
+ size_t written = buf.Size() - ss.Avail();
+ if (lp.Width > written) {
+ WriteChars(o, lp.Padc, lp.Width - written);
+ }
+ o.Write(buf.Data(), written);
+ return o;
+ }
+
+ template <typename T>
+ struct TRightPad {
+ T Value;
+ size_t Width;
+ char Padc;
+
+ inline TRightPad(const T& value, size_t width, char padc)
+ : Value(value)
+ , Width(width)
+ , Padc(padc)
+ {
+ }
+ };
+
+ template <typename T>
+ IOutputStream& operator<<(IOutputStream& o, const TRightPad<T>& lp) {
+ TTempBuf buf;
+ TMemoryOutput ss(buf.Data(), buf.Size());
+ ss << lp.Value;
+ size_t written = buf.Size() - ss.Avail();
+ o.Write(buf.Data(), written);
+ if (lp.Width > written) {
+ WriteChars(o, lp.Padc, lp.Width - written);
+ }
+ return o;
+ }
+
+ template <typename T, size_t Base>
+ struct TBaseNumber {
+ T Value;
+ ENumberFormat Flags;
+
+ template <typename OtherT>
+ inline TBaseNumber(OtherT value, ENumberFormat flags)
+ : Value(value)
+ , Flags(flags)
+ {
+ }
+ };
+
+ template <typename T, size_t Base>
+ using TUnsignedBaseNumber = TBaseNumber<std::make_unsigned_t<std::remove_cv_t<T>>, Base>;
+
+ template <typename T, size_t Base>
+ IOutputStream& operator<<(IOutputStream& stream, const TBaseNumber<T, Base>& value) {
+ char buf[8 * sizeof(T) + 1]; /* Add 1 for sign. */
+ TStringBuf str(buf, IntToString<Base>(value.Value, buf, sizeof(buf)));
+
+ if (str[0] == '-') {
+ stream << '-';
+ str.Skip(1);
+ }
+
+ if (value.Flags & HF_ADDX) {
+ if (Base == 16) {
+ stream << TStringBuf("0x");
+ } else if (Base == 2) {
+ stream << TStringBuf("0b");
+ }
+ }
+
+ if (value.Flags & HF_FULL) {
+ WriteChars(stream, '0', (8 * sizeof(T) + TLog2<Base>::value - 1) / TLog2<Base>::value - str.size());
+ }
+
+ stream << str;
+ return stream;
+ }
+
+ template <typename Char, size_t Base>
+ struct TBaseText {
+ TBasicStringBuf<Char> Text;
+
+ inline TBaseText(const TBasicStringBuf<Char> text)
+ : Text(text)
+ {
+ }
+ };
+
+ template <typename Char, size_t Base>
+ IOutputStream& operator<<(IOutputStream& os, const TBaseText<Char, Base>& text) {
+ for (size_t i = 0; i < text.Text.size(); ++i) {
+ if (i != 0) {
+ os << ' ';
+ }
+ os << TUnsignedBaseNumber<Char, Base>(text.Text[i], HF_FULL);
+ }
+ return os;
+ }
+
+ template <typename T>
+ struct TFloatPrecision {
+ using TdVal = std::remove_cv_t<T>;
+ static_assert(std::is_floating_point<TdVal>::value, "expect std::is_floating_point<TdVal>::value");
+
+ TdVal Value;
+ EFloatToStringMode Mode;
+ int NDigits;
+ };
+
+ template <typename T>
+ IOutputStream& operator<<(IOutputStream& o, const TFloatPrecision<T>& prec) {
+ char buf[512];
+ size_t count = FloatToString(prec.Value, buf, sizeof(buf), prec.Mode, prec.NDigits);
+ o << TStringBuf(buf, count);
+ return o;
+ }
+
+ struct THumanReadableDuration {
+ TDuration Value;
+
+ constexpr THumanReadableDuration(const TDuration& value)
+ : Value(value)
+ {
+ }
+ };
+
+ struct THumanReadableSize {
+ double Value;
+ ESizeFormat Format;
+ };
+}
+
+/**
+ * Output manipulator basically equivalent to `std::setw` and `std::setfill`
+ * combined.
+ *
+ * When written into a `IOutputStream`, writes out padding characters first,
+ * and then provided value.
+ *
+ * Example usage:
+ * @code
+ * stream << LeftPad(12345, 10, '0'); // Will output "0000012345"
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param width Target total width.
+ * @param padc Character to use for padding.
+ * @see RightPad
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TLeftPad<T> LeftPad(const T& value, const size_t width, const char padc = ' ') noexcept {
+ return ::NFormatPrivate::TLeftPad<T>(value, width, padc);
+}
+
+template <typename T, int N>
+static constexpr ::NFormatPrivate::TLeftPad<const T*> LeftPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept {
+ return ::NFormatPrivate::TLeftPad<const T*>(value, width, padc);
+}
+
+/**
+ * Output manipulator similar to `std::setw` and `std::setfill`.
+ *
+ * When written into a `IOutputStream`, writes provided value first, and then
+ * the padding characters.
+ *
+ * Example usage:
+ * @code
+ * stream << RightPad("column1", 10, ' '); // Will output "column1 "
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param width Target total width.
+ * @param padc Character to use for padding.
+ * @see LeftPad
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TRightPad<T> RightPad(const T& value, const size_t width, const char padc = ' ') noexcept {
+ return ::NFormatPrivate::TRightPad<T>(value, width, padc);
+}
+
+template <typename T, int N>
+static constexpr ::NFormatPrivate::TRightPad<const T*> RightPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept {
+ return ::NFormatPrivate::TRightPad<const T*>(value, width, padc);
+}
+
+/**
+ * Output manipulator similar to `std::setbase(16)`.
+ *
+ * When written into a `IOutputStream`, writes out the provided value in
+ * hexadecimal form. The value is treated as unsigned, even if its type is in
+ * fact signed.
+ *
+ * Example usage:
+ * @code
+ * stream << Hex(-1); // Will output "0xFFFFFFFF"
+ * stream << Hex(1ull); // Will output "0x0000000000000001"
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param flags Output flags.
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TUnsignedBaseNumber<T, 16> Hex(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept {
+ return {value, flags};
+}
+
+/**
+ * Output manipulator similar to `std::setbase(16)`.
+ *
+ * When written into a `IOutputStream`, writes out the provided value in
+ * hexadecimal form.
+ *
+ * Example usage:
+ * @code
+ * stream << SHex(-1); // Will output "-0x00000001"
+ * stream << SHex(1ull); // Will output "0x0000000000000001"
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param flags Output flags.
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TBaseNumber<T, 16> SHex(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept {
+ return {value, flags};
+}
+
+/**
+ * Output manipulator similar to `std::setbase(2)`.
+ *
+ * When written into a `IOutputStream`, writes out the provided value in
+ * binary form. The value is treated as unsigned, even if its type is in
+ * fact signed.
+ *
+ * Example usage:
+ * @code
+ * stream << Bin(-1); // Will output "0b11111111111111111111111111111111"
+ * stream << Bin(1); // Will output "0b00000000000000000000000000000001"
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param flags Output flags.
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TUnsignedBaseNumber<T, 2> Bin(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept {
+ return {value, flags};
+}
+
+/**
+ * Output manipulator similar to `std::setbase(2)`.
+ *
+ * When written into a `IOutputStream`, writes out the provided value in
+ * binary form.
+ *
+ * Example usage:
+ * @code
+ * stream << SBin(-1); // Will output "-0b00000000000000000000000000000001"
+ * stream << SBin(1); // Will output "0b00000000000000000000000000000001"
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param flags Output flags.
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TBaseNumber<T, 2> SBin(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept {
+ return {value, flags};
+}
+
+/**
+ * Output manipulator for hexadecimal string output.
+ *
+ * When written into a `IOutputStream`, writes out the provided characters
+ * in hexadecimal form divided by space character.
+ *
+ * Example usage:
+ * @code
+ * stream << HexText(TStringBuf("abcи")); // Will output "61 62 63 D0 B8"
+ * stream << HexText(TWtringBuf(u"abcи")); // Will output "0061 0062 0063 0438"
+ * @endcode
+ *
+ * @param value String to output.
+ */
+template <typename TChar>
+static inline ::NFormatPrivate::TBaseText<TChar, 16> HexText(const TBasicStringBuf<TChar> value) {
+ return ::NFormatPrivate::TBaseText<TChar, 16>(value);
+}
+
+/**
+ * Output manipulator for binary string output.
+ *
+ * When written into a `IOutputStream`, writes out the provided characters
+ * in binary form divided by space character.
+ *
+ * Example usage:
+ * @code
+ * stream << BinText(TStringBuf("aaa")); // Will output "01100001 01100001 01100001"
+ * @endcode
+ *
+ * @param value String to output.
+ */
+template <typename TChar>
+static inline ::NFormatPrivate::TBaseText<TChar, 2> BinText(const TBasicStringBuf<TChar> value) {
+ return ::NFormatPrivate::TBaseText<TChar, 2>(value);
+}
+
+/**
+ * Output manipulator for printing `TDuration` values.
+ *
+ * When written into a `IOutputStream`, writes out the provided `TDuration`
+ * in auto-adjusted human-readable format.
+ *
+ * Example usage:
+ * @code
+ * stream << HumanReadable(TDuration::MicroSeconds(100)); // Will output "100us"
+ * stream << HumanReadable(TDuration::Seconds(3672)); // Will output "1h 1m 12s"
+ * @endcode
+ *
+ * @param value Value to output.
+ */
+static constexpr ::NFormatPrivate::THumanReadableDuration HumanReadable(const TDuration duration) noexcept {
+ return ::NFormatPrivate::THumanReadableDuration(duration);
+}
+
+/**
+ * Output manipulator for writing out human-readable number of elements / memory
+ * amount in `ls -h` style.
+ *
+ * When written into a `IOutputStream`, writes out the provided unsigned integer
+ * variable with small precision and a suffix (like 'K', 'M', 'G' for numbers, or
+ * 'B', 'KiB', 'MiB', 'GiB' for bytes).
+ *
+ * For quantities, base 1000 is used. For bytes, base is 1024.
+ *
+ * Example usage:
+ * @code
+ * stream << HumanReadableSize(1024, SF_QUANTITY); // Will output "1.02K"
+ * stream << HumanReadableSize(1024, SF_BYTES); // Will output "1KiB"
+ * stream << "average usage " << HumanReadableSize(100 / 3., SF_BYTES); // Will output "average usage "33.3B""
+ * @endcode
+ *
+ * @param value Value to output.
+ * @param format Format to use.
+ */
+static constexpr ::NFormatPrivate::THumanReadableSize HumanReadableSize(const double size, ESizeFormat format) noexcept {
+ return {size, format};
+}
+
+void Time(IOutputStream& l);
+void TimeHumanReadable(IOutputStream& l);
+
+/**
+ * Output manipulator for adjusting precision of floating point values.
+ *
+ * When written into a `IOutputStream`, writes out the provided floating point
+ * variable with given precision. The behavior depends on provided `mode`.
+ *
+ * Example usage:
+ * @code
+ * stream << Prec(1.2345678901234567, PREC_AUTO); // Will output "1.2345678901234567"
+ * @endcode
+ *
+ * @param value float or double to output.
+ * @param mode Output mode.
+ * @param ndigits Number of significant digits (in `PREC_NDIGITS` and `PREC_POINT_DIGITS` mode).
+ * @see EFloatToStringMode
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const EFloatToStringMode mode, const int ndigits = 0) noexcept {
+ return {value, mode, ndigits};
+}
+
+/**
+ * Output manipulator for adjusting precision of floating point values.
+ *
+ * When written into a `IOutputStream`, writes out the provided floating point
+ * variable with given precision. The behavior is equivalent to `Prec(value, PREC_NDIGITS, ndigits)`.
+ *
+ * Example usage:
+ * @code
+ * stream << Prec(1.2345678901234567, 3); // Will output "1.23"
+ * @endcode
+ *
+ * @param value float or double to output.
+ * @param ndigits Number of significant digits.
+ */
+template <typename T>
+static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const int ndigits) noexcept {
+ return {value, PREC_NDIGITS, ndigits};
+}
diff --git a/util/stream/format_ut.cpp b/util/stream/format_ut.cpp
new file mode 100644
index 0000000000..43245aeb48
--- /dev/null
+++ b/util/stream/format_ut.cpp
@@ -0,0 +1,182 @@
+#include "format.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/charset/wide.h>
+
+Y_UNIT_TEST_SUITE(TOutputStreamFormattingTest) {
+ Y_UNIT_TEST(TestLeftPad) {
+ TStringStream ss;
+ ss << LeftPad(10, 4, '0');
+ UNIT_ASSERT_VALUES_EQUAL("0010", ss.Str());
+
+ ss.Clear();
+ ss << LeftPad(222, 1);
+ UNIT_ASSERT_VALUES_EQUAL("222", ss.Str());
+ }
+
+ Y_UNIT_TEST(TestRightPad) {
+ TStringStream ss;
+ ss << RightPad("aa", 4);
+ UNIT_ASSERT_VALUES_EQUAL("aa ", ss.Str());
+
+ ss.Clear();
+ ss << RightPad("aa", 1);
+ UNIT_ASSERT_VALUES_EQUAL("aa", ss.Str());
+ }
+
+ Y_UNIT_TEST(TestTime) {
+ TStringStream ss;
+
+ ss << "[" << Time << "] "
+ << "qwqw" << TimeHumanReadable << Endl;
+ }
+
+ Y_UNIT_TEST(TestHexReference) {
+ /*
+ One possible implementation of Hex() stores a reference to the given object.
+ This can lead to wrong results if the given object is a temporary
+ which is valid only during constructor call. The following code tries to
+ demonstrate this. If the implementation stores a reference,
+ the test fails if compiled with g++44 in debug build
+ (without optimizations), but performs correctly in release build.
+ */
+ THolder<TStringStream> ss(new TStringStream);
+ THolder<int> ii(new int(0x1234567));
+ (*ss) << Hex(*ii);
+ UNIT_ASSERT_VALUES_EQUAL("0x01234567", ss->Str());
+ }
+
+ Y_UNIT_TEST(TestHexText) {
+ {
+ TStringStream ss;
+ ss << HexText(TStringBuf("abcи"));
+ UNIT_ASSERT_VALUES_EQUAL("61 62 63 D0 B8", ss.Str());
+ }
+ {
+ TStringStream ss;
+ TUtf16String w = UTF8ToWide("abcи");
+ ss << HexText<wchar16>(w);
+ UNIT_ASSERT_VALUES_EQUAL("0061 0062 0063 0438", ss.Str());
+ }
+ }
+
+ Y_UNIT_TEST(TestBin) {
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(2), nullptr)), "10");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2), nullptr)), "-10");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2))), "-0b00000000000000000000000000000010");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2), HF_FULL)), "-00000000000000000000000000000010");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(15), nullptr)), "1111");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(1))), "0b00000000000000000000000000000001");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(-1))), "0b11111111111111111111111111111111");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<i32>(-1))), "0b11111111111111111111111111111111");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<i32>(-1), nullptr)), "11111111111111111111111111111111");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(256))), "0b00000000000000000000000100000000");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui8>(16))), "0b00010000");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui64>(1234587912357ull))), "0b0000000000000000000000010001111101110011001011001000100010100101");
+ }
+
+ Y_UNIT_TEST(TestBinText) {
+ UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("\1"))), "00000001");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("\1\1"))), "00000001 00000001");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("aaa"))), "01100001 01100001 01100001");
+ }
+
+ Y_UNIT_TEST(TestPrec) {
+ TStringStream ss;
+ ss << Prec(1.2345678901234567, PREC_AUTO);
+ UNIT_ASSERT_VALUES_EQUAL("1.2345678901234567", ss.Str());
+
+ ss.Clear();
+ ss << Prec(1.2345678901234567, 3);
+ UNIT_ASSERT_VALUES_EQUAL("1.23", ss.Str());
+
+ ss.Clear();
+ ss << Prec(1.2345678901234567, PREC_POINT_DIGITS, 3);
+ UNIT_ASSERT_VALUES_EQUAL("1.235", ss.Str());
+ }
+
+ Y_UNIT_TEST(TestHumanReadableSize1000) {
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(0, SF_QUANTITY)), "0");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1, SF_QUANTITY)), "1");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000, SF_QUANTITY)), "1K");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1234567, SF_QUANTITY)), "1.23M");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(12345678, SF_QUANTITY)), "12.3M");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(12345678 * 1000ull, SF_QUANTITY)), "12.3G");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1, SF_QUANTITY)), "-1");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000, SF_QUANTITY)), "-1K");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1234567, SF_QUANTITY)), "-1.23M");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-12345678, SF_QUANTITY)), "-12.3M");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-12345678 * 1000ll, SF_QUANTITY)), "-12.3G");
+ }
+
+ Y_UNIT_TEST(TestHumanReadableSize1024) {
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(0, SF_BYTES)), "0B");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(100, SF_BYTES)), "100B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024, SF_BYTES)), "1KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(2.25 * 1024 * 1024, SF_BYTES)), "2.25MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(2.5 * 1024, SF_BYTES)), "2.5KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(45.3 * 1024, SF_BYTES)), "45.3KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024 * 1024, SF_BYTES)), "1MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(5 * 1024 * 1024, SF_BYTES)), "5MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1236 * 1024 * 1024, SF_BYTES)), "1.21GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024ull * 1024 * 1024 * 1024, SF_BYTES)), "1TiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(100 / 3., SF_BYTES)), "33.3B");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-100, SF_BYTES)), "-100B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024, SF_BYTES)), "-1KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-2.25 * 1024 * 1024, SF_BYTES)), "-2.25MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-2.5 * 1024, SF_BYTES)), "-2.5KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-45.3 * 1024, SF_BYTES)), "-45.3KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024 * 1024, SF_BYTES)), "-1MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-5 * 1024 * 1024, SF_BYTES)), "-5MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1236 * 1024 * 1024, SF_BYTES)), "-1.21GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024ll * 1024 * 1024 * 1024, SF_BYTES)), "-1TiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-100 / 3., SF_BYTES)), "-33.3B");
+
+ // XXX: For 1000 <= x < 1024, Prec(x, 3) falls back to exponential form
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll, SF_BYTES)), "1000B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll, SF_BYTES)), "1010B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024, SF_BYTES)), "1000KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024, SF_BYTES)), "1010KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024, SF_BYTES)), "1000MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024, SF_BYTES)), "1010MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024 * 1024, SF_BYTES)), "1000GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024 * 1024, SF_BYTES)), "1010GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "1000TiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "1010TiB");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll, SF_BYTES)), "-1000B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll, SF_BYTES)), "-1010B");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024, SF_BYTES)), "-1000KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024, SF_BYTES)), "-1010KiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024, SF_BYTES)), "-1000MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024, SF_BYTES)), "-1010MiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024 * 1024, SF_BYTES)), "-1000GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024 * 1024, SF_BYTES)), "-1010GiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "-1000TiB");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "-1010TiB");
+ }
+
+ Y_UNIT_TEST(TestHumanReadableDuration) {
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(0))), "0us");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1))), "1us");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(100))), "100us");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1234))), "1.23ms");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(12345))), "12.3ms");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1234567))), "1.23s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(5))), "5s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(59.9))), "59.9s");
+
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(60))), "1m");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(61))), "1m 1s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(72))), "1m 12s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(620))), "10m 20s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(3600))), "1h");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(3672))), "1h 1m 12s");
+ UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(4220))), "1h 10m 20s");
+ }
+}
diff --git a/util/stream/fwd.cpp b/util/stream/fwd.cpp
new file mode 100644
index 0000000000..4214b6df83
--- /dev/null
+++ b/util/stream/fwd.cpp
@@ -0,0 +1 @@
+#include "fwd.h"
diff --git a/util/stream/fwd.h b/util/stream/fwd.h
new file mode 100644
index 0000000000..307676c6a7
--- /dev/null
+++ b/util/stream/fwd.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include <util/system/types.h>
+
+class IInputStream;
+class IOutputStream;
+
+class IZeroCopyInput;
+class IZeroCopyInputFastReadTo;
+class IZeroCopyOutput;
+
+using TStreamManipulator = void (*)(IOutputStream&);
+
+class TLengthLimitedInput;
+class TCountingInput;
+class TCountingOutput;
+
+class TMemoryInput;
+class TMemoryOutput;
+class TMemoryWriteBuffer;
+
+class TMultiInput;
+
+class TNullInput;
+class TNullOutput;
+class TNullIO;
+
+class TPipeBase;
+class TPipeInput;
+class TPipeOutput;
+class TPipedBase;
+class TPipedInput;
+class TPipedOutput;
+
+class TStringInput;
+class TStringOutput;
+class TStringStream;
+
+class TTeeOutput;
+
+class TTempBufOutput;
+
+struct TEol;
+
+template <typename TEndOfToken>
+class TStreamTokenizer;
+
+enum ETraceLevel: ui8;
+
+class IWalkInput;
+
+struct TZLibError;
+struct TZLibCompressorError;
+struct TZLibDecompressorError;
+
+namespace ZLib {
+ enum StreamType: ui8;
+}
+
+class TZLibDecompress;
+class TZLibCompress;
+class TBufferedZLibDecompress;
+
+using TZDecompress = TBufferedZLibDecompress;
+
+class TAlignedInput;
+class TAlignedOutput;
+
+class TBufferInput;
+class TBufferOutput;
+class TBufferStream;
+
+class TBufferedInput;
+class TBufferedOutputBase;
+class TBufferedOutput;
+class TAdaptiveBufferedOutput;
+
+template <class TSlave>
+class TBuffered;
+
+template <class TSlave>
+class TAdaptivelyBuffered;
+
+class TDebugOutput;
+
+class TRandomAccessFileInput;
+class TRandomAccessFileOutput;
+class TBufferedFileOutputEx;
+
+class TUnbufferedFileInput;
+class TMappedFileInput;
+class TUnbufferedFileOutput;
+
+class TFileInput;
+using TIFStream = TFileInput;
+
+class TFixedBufferFileOutput;
+using TOFStream = TFixedBufferFileOutput;
+
+using TFileOutput = TAdaptivelyBuffered<TUnbufferedFileOutput>;
diff --git a/util/stream/hex.cpp b/util/stream/hex.cpp
new file mode 100644
index 0000000000..1c05330504
--- /dev/null
+++ b/util/stream/hex.cpp
@@ -0,0 +1,30 @@
+#include "hex.h"
+
+#include "output.h"
+#include <util/string/hex.h>
+
+void HexEncode(const void* in, size_t len, IOutputStream& out) {
+ static const size_t NUM_OF_BYTES = 32;
+ char buffer[NUM_OF_BYTES * 2];
+
+ auto current = static_cast<const char*>(in);
+ for (size_t take = 0; len; current += take, len -= take) {
+ take = Min(NUM_OF_BYTES, len);
+ HexEncode(current, take, buffer);
+ out.Write(buffer, take * 2);
+ }
+}
+
+void HexDecode(const void* in, size_t len, IOutputStream& out) {
+ Y_ENSURE(!(len & 1), TStringBuf("Odd buffer length passed to HexDecode"));
+
+ static const size_t NUM_OF_BYTES = 32;
+ char buffer[NUM_OF_BYTES];
+
+ auto current = static_cast<const char*>(in);
+ for (size_t take = 0; len; current += take, len -= take) {
+ take = Min(NUM_OF_BYTES * 2, len);
+ HexDecode(current, take, buffer);
+ out.Write(buffer, take / 2);
+ }
+}
diff --git a/util/stream/hex.h b/util/stream/hex.h
new file mode 100644
index 0000000000..a018933b1b
--- /dev/null
+++ b/util/stream/hex.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include <util/system/types.h>
+
+class IOutputStream;
+
+void HexEncode(const void* in, size_t len, IOutputStream& out);
+void HexDecode(const void* in, size_t len, IOutputStream& out);
diff --git a/util/stream/hex_ut.cpp b/util/stream/hex_ut.cpp
new file mode 100644
index 0000000000..5074a0b616
--- /dev/null
+++ b/util/stream/hex_ut.cpp
@@ -0,0 +1,29 @@
+#include "hex.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include "str.h"
+
+Y_UNIT_TEST_SUITE(THexCodingTest) {
+ void TestImpl(const TString& data) {
+ TString encoded;
+ TStringOutput encodedOut(encoded);
+ HexEncode(data.data(), data.size(), encodedOut);
+
+ UNIT_ASSERT_EQUAL(encoded.size(), data.size() * 2);
+
+ TString decoded;
+ TStringOutput decodedOut(decoded);
+ HexDecode(encoded.data(), encoded.size(), decodedOut);
+
+ UNIT_ASSERT_EQUAL(decoded, data);
+ }
+
+ Y_UNIT_TEST(TestEncodeDecodeToStream) {
+ TString data = "100ABAcaba500,$%0987123456 \n\t\x01\x02\x03.";
+ TestImpl(data);
+ }
+
+ Y_UNIT_TEST(TestEmpty) {
+ TestImpl("");
+ }
+}
diff --git a/util/stream/holder.cpp b/util/stream/holder.cpp
new file mode 100644
index 0000000000..f5617eef58
--- /dev/null
+++ b/util/stream/holder.cpp
@@ -0,0 +1 @@
+#include "holder.h"
diff --git a/util/stream/holder.h b/util/stream/holder.h
new file mode 100644
index 0000000000..c60a4e510c
--- /dev/null
+++ b/util/stream/holder.h
@@ -0,0 +1,44 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+
+#include <utility>
+#include <type_traits>
+
+class IInputStream;
+class IOutputStream;
+
+namespace NPrivate {
+ template <class Stream, bool isInput = std::is_base_of<IInputStream, Stream>::value>
+ struct TStreamBase {
+ using TType = IInputStream;
+ };
+
+ template <class Stream>
+ struct TStreamBase<Stream, false> {
+ using TType = IOutputStream;
+ };
+
+}
+
+/**
+ * An ownership-gaining wrapper for proxy streams.
+ *
+ * Example usage:
+ * \code
+ * TCountingInput* input = new THoldingStream<TCountingInput>(new TStringInput(s));
+ * \encode
+ *
+ * In this example, resulting counting input also owns a string input that it
+ * was constructed on top of.
+ */
+template <class Base, class StreamBase = typename ::NPrivate::TStreamBase<Base>::TType>
+class THoldingStream: private THolder<StreamBase>, public Base {
+public:
+ template <class... Args>
+ inline THoldingStream(THolder<StreamBase> stream, Args&&... args)
+ : THolder<StreamBase>(std::move(stream))
+ , Base(this->Get(), std::forward<Args>(args)...)
+ {
+ }
+};
diff --git a/util/stream/input.cpp b/util/stream/input.cpp
new file mode 100644
index 0000000000..6e8170f2f9
--- /dev/null
+++ b/util/stream/input.cpp
@@ -0,0 +1,344 @@
+#include "input.h"
+#include "output.h"
+#include "str.h"
+
+#include <util/charset/wide.h>
+#include <util/memory/tempbuf.h>
+#include <util/generic/string.h>
+#include <util/generic/yexception.h>
+#include <util/generic/singleton.h>
+#include <util/string/cast.h>
+#include <util/system/compat.h>
+#include <util/system/spinlock.h>
+
+#include <cstdlib>
+
+IInputStream::IInputStream() noexcept = default;
+
+IInputStream::~IInputStream() = default;
+
+size_t IInputStream::DoReadTo(TString& st, char to) {
+ char ch;
+
+ if (!Read(&ch, 1)) {
+ return 0;
+ }
+
+ st.clear();
+
+ size_t result = 0;
+ do {
+ ++result;
+
+ if (ch == to) {
+ break;
+ }
+
+ st += ch;
+ } while (Read(&ch, 1));
+
+ return result;
+}
+
+ui64 IInputStream::DoReadAll(IOutputStream& out) {
+ TTempBuf buffer;
+ void* ptr = buffer.Data();
+ size_t size = buffer.Size();
+
+ ui64 result = 0;
+ while (size_t read = Read(ptr, size)) {
+ out.Write(ptr, read);
+ result += read;
+ }
+
+ return result;
+}
+
+size_t IInputStream::Load(void* buf_in, size_t len) {
+ char* buf = (char*)buf_in;
+
+ while (len) {
+ const size_t ret = Read(buf, len);
+
+ buf += ret;
+ len -= ret;
+
+ if (ret == 0) {
+ break;
+ }
+ }
+
+ return buf - (char*)buf_in;
+}
+
+void IInputStream::LoadOrFail(void* buf, size_t len) {
+ const size_t realLen = Load(buf, len);
+ if (Y_UNLIKELY(realLen != len)) {
+ ythrow yexception() << "Failed to read required number of bytes from stream! Expected: " << len << ", gained: " << realLen << "!";
+ }
+}
+
+size_t IInputStream::ReadLine(TString& st) {
+ const size_t ret = ReadTo(st, '\n');
+
+ if (ret && !st.empty() && st.back() == '\r') {
+ st.pop_back();
+ }
+
+ return ret;
+}
+
+size_t IInputStream::ReadLine(TUtf16String& w) {
+ TString s;
+ size_t result = ReadLine(s);
+
+ if (result) {
+ UTF8ToWide(s, w);
+ }
+
+ return result;
+}
+
+TString IInputStream::ReadLine() {
+ TString ret;
+
+ if (!ReadLine(ret)) {
+ ythrow yexception() << "can not read line from stream";
+ }
+
+ return ret;
+}
+
+TString IInputStream::ReadTo(char ch) {
+ TString ret;
+
+ if (!ReadTo(ret, ch)) {
+ ythrow yexception() << "can not read from stream";
+ }
+
+ return ret;
+}
+
+size_t IInputStream::Skip(size_t sz) {
+ return DoSkip(sz);
+}
+
+size_t IInputStream::DoSkip(size_t sz) {
+ if (sz < 128) {
+ return Load(alloca(sz), sz);
+ }
+
+ TTempBuf buf;
+ size_t total = 0;
+
+ while (sz) {
+ const size_t lresult = Read(buf.Data(), Min<size_t>(sz, buf.Size()));
+
+ if (lresult == 0) {
+ return total;
+ }
+
+ total += lresult;
+ sz -= lresult;
+ }
+
+ return total;
+}
+
+TString IInputStream::ReadAll() {
+ TString result;
+ TStringOutput stream(result);
+
+ DoReadAll(stream);
+
+ return result;
+}
+
+ui64 IInputStream::ReadAll(IOutputStream& out) {
+ return DoReadAll(out);
+}
+
+ui64 TransferData(IInputStream* in, IOutputStream* out) {
+ return in->ReadAll(*out);
+}
+
+namespace {
+ struct TStdIn: public IInputStream {
+ ~TStdIn() override = default;
+
+ size_t DoRead(void* buf, size_t len) override {
+ const size_t ret = fread(buf, 1, len, F_);
+
+ if (ret < len && ferror(F_)) {
+ ythrow TSystemError() << "can not read from stdin";
+ }
+
+ return ret;
+ }
+
+ FILE* F_ = stdin;
+ };
+
+#if defined(_win_)
+ using TGetLine = TStdIn;
+#else
+ #if defined(_bionic_)
+ using TGetLineBase = TStdIn;
+ #else
+ struct TGetLineBase: public TStdIn {
+ ~TGetLineBase() override {
+ free(B_);
+ }
+
+ size_t DoReadTo(TString& st, char ch) override {
+ auto&& guard = Guard(M_);
+
+ (void)guard;
+
+ const auto r = getdelim(&B_, &L_, ch, F_);
+
+ if (r < 0) {
+ if (ferror(F_)) {
+ ythrow TSystemError() << "can not read from stdin";
+ }
+
+ st.clear();
+
+ return 0;
+ }
+
+ st.AssignNoAlias(B_, r);
+
+ if (st && st.back() == ch) {
+ st.pop_back();
+ }
+
+ return r;
+ }
+
+ TAdaptiveLock M_;
+ char* B_ = nullptr;
+ size_t L_ = 0;
+ };
+ #endif
+
+ #if defined(_glibc_) || defined(_cygwin_)
+ // glibc does not have fgetln
+ using TGetLine = TGetLineBase;
+ #else
+ struct TGetLine: public TGetLineBase {
+ size_t DoReadTo(TString& st, char ch) override {
+ if (ch == '\n') {
+ size_t len = 0;
+ auto r = fgetln(F_, &len);
+
+ if (r) {
+ st.AssignNoAlias(r, len);
+
+ if (st && st.back() == '\n') {
+ st.pop_back();
+ }
+
+ return len;
+ }
+ }
+
+ return TGetLineBase::DoReadTo(st, ch);
+ }
+ };
+ #endif
+#endif
+}
+
+IInputStream& NPrivate::StdInStream() noexcept {
+ return *SingletonWithPriority<TGetLine, 4>();
+}
+
+// implementation of >> operator
+
+// helper functions
+
+static inline bool IsStdDelimiter(char c) {
+ return (c == '\0') || (c == ' ') || (c == '\r') || (c == '\n') || (c == '\t');
+}
+
+static void ReadUpToDelimiter(IInputStream& i, TString& s) {
+ char c;
+ while (i.ReadChar(c)) { // skip delimiters
+ if (!IsStdDelimiter(c)) {
+ s += c;
+ break;
+ }
+ }
+ while (i.ReadChar(c) && !IsStdDelimiter(c)) { // read data (with trailing delimiter)
+ s += c;
+ }
+}
+
+// specialization for string-related stuff
+
+template <>
+void In<TString>(IInputStream& i, TString& s) {
+ s.resize(0);
+ ReadUpToDelimiter(i, s);
+}
+
+template <>
+void In<TUtf16String>(IInputStream& i, TUtf16String& w) {
+ TString s;
+ ReadUpToDelimiter(i, s);
+
+ if (s.empty()) {
+ w.erase();
+ } else {
+ w = UTF8ToWide(s);
+ }
+}
+
+// specialization for char types
+
+#define SPEC_FOR_CHAR(T) \
+ template <> \
+ void In<T>(IInputStream & i, T & t) { \
+ i.ReadChar((char&)t); \
+ }
+
+SPEC_FOR_CHAR(char)
+SPEC_FOR_CHAR(unsigned char)
+SPEC_FOR_CHAR(signed char)
+
+#undef SPEC_FOR_CHAR
+
+// specialization for number types
+
+#define SPEC_FOR_NUMBER(T) \
+ template <> \
+ void In<T>(IInputStream & i, T & t) { \
+ char buf[128]; \
+ size_t pos = 0; \
+ while (i.ReadChar(buf[0])) { \
+ if (!IsStdDelimiter(buf[0])) { \
+ ++pos; \
+ break; \
+ } \
+ } \
+ while (i.ReadChar(buf[pos]) && !IsStdDelimiter(buf[pos]) && pos < 127) { \
+ ++pos; \
+ } \
+ t = FromString<T, char>(buf, pos); \
+ }
+
+SPEC_FOR_NUMBER(signed short)
+SPEC_FOR_NUMBER(signed int)
+SPEC_FOR_NUMBER(signed long int)
+SPEC_FOR_NUMBER(signed long long int)
+SPEC_FOR_NUMBER(unsigned short)
+SPEC_FOR_NUMBER(unsigned int)
+SPEC_FOR_NUMBER(unsigned long int)
+SPEC_FOR_NUMBER(unsigned long long int)
+
+SPEC_FOR_NUMBER(float)
+SPEC_FOR_NUMBER(double)
+SPEC_FOR_NUMBER(long double)
+
+#undef SPEC_FOR_NUMBER
diff --git a/util/stream/input.h b/util/stream/input.h
new file mode 100644
index 0000000000..f0d5807ed2
--- /dev/null
+++ b/util/stream/input.h
@@ -0,0 +1,273 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+#include <util/generic/noncopyable.h>
+#include <util/system/defaults.h>
+
+class IOutputStream;
+
+/**
+ * @addtogroup Streams_Base
+ * @{
+ */
+
+/**
+ * Abstract input stream.
+ */
+class IInputStream: public TNonCopyable {
+public:
+ IInputStream() noexcept;
+ virtual ~IInputStream();
+
+ IInputStream(IInputStream&&) noexcept {
+ }
+
+ IInputStream& operator=(IInputStream&&) noexcept {
+ return *this;
+ }
+
+ /**
+ * Reads some data from the stream. Note that this function might read less
+ * data than what was requested. Use `Load` function if you want to read as
+ * much data as possible.
+ *
+ * @param buf Buffer to read into.
+ * @param len Number of bytes to read.
+ * @returns Number of bytes that were actually read.
+ * A return value of zero signals end of stream.
+ */
+ inline size_t Read(void* buf, size_t len) {
+ if (len == 0) {
+ return 0;
+ }
+
+ return DoRead(buf, len);
+ }
+
+ /**
+ * Reads one character from the stream.
+ *
+ * @param[out] c Character to read.
+ * @returns Whether the character was read.
+ * A return value of false signals the end
+ * of stream.
+ */
+ inline bool ReadChar(char& c) {
+ return DoRead(&c, 1) > 0;
+ }
+
+ /**
+ * Reads all characters from the stream until the given character is
+ * encountered, and stores them into the given string. The character itself
+ * is read from the stream, but not stored in the string.
+ *
+ * @param[out] st String to read into.
+ * @param ch Character to stop at.
+ * @returns Total number of characters read from the stream.
+ * A return value of zero signals end of stream.
+ */
+ inline size_t ReadTo(TString& st, char ch) {
+ return DoReadTo(st, ch);
+ }
+
+ /**
+ * Reads the requested amount of data from the stream. Unlike `Read`, this
+ * function stops only when the requested amount of data is read, or when
+ * end of stream is reached.
+ *
+ * @param buf Buffer to read into.
+ * @param len Number of bytes to read.
+ * @returns Number of bytes that were actually read.
+ * A return value different from `len`
+ * signals end of stream.
+ */
+ size_t Load(void* buf, size_t len);
+
+ /**
+ * Reads the requested amount of data from the stream, or fails with an
+ * exception if unable to do so.
+ *
+ * @param buf Buffer to read into.
+ * @param len Number of bytes to read.
+ * @see Load
+ */
+ void LoadOrFail(void* buf, size_t len);
+
+ /**
+ * Reads all data from this stream and returns it as a string.
+ *
+ * @returns Contents of this stream as a string.
+ */
+ TString ReadAll();
+
+ /**
+ * Reads all data from this stream and writes it into a provided output
+ * stream.
+ *
+ * @param out Output stream to use.
+ * @returns Total number of characters read from the stream.
+ */
+ ui64 ReadAll(IOutputStream& out);
+
+ /**
+ * Reads all data from the stream until the first occurrence of '\n'. Also
+ * handles Windows line breaks correctly.
+ *
+ * @returns Next line read from this stream,
+ * excluding the line terminator.
+ * @throws yexception If no data could be read from a stream
+ * because end of stream has already been
+ * reached.
+ */
+ TString ReadLine();
+
+ /**
+ * Reads all characters from the stream until the given character is
+ * encountered and returns them as a string. The character itself is read
+ * from the stream, but not stored in the string.
+ *
+ * @param ch Character to stop at.
+ * @returns String containing all the characters read.
+ * @throws yexception If no data could be read from a stream
+ * because end of stream has already been
+ * reached.
+ */
+ TString ReadTo(char ch);
+
+ /**
+ * Reads all data from the stream until the first occurrence of '\n' and
+ * stores it into provided string. Also handles Windows line breaks correctly.
+ *
+ * @param[out] st String to store read characters into,
+ * excluding the line terminator.
+ * @returns Total number of characters read from the stream.
+ * A return value of zero signals end of stream.
+ */
+ size_t ReadLine(TString& st);
+
+ /**
+ * Reads UTF8 encoded characters from the stream the first occurrence of '\n',
+ * converts them into wide ones, and stores into provided string. Also handles
+ * Windows line breaks correctly.
+ *
+ * @param[out] w Wide string to store read characters into,
+ * excluding the line terminator.
+ * @returns Total number of characters read from the stream.
+ * A return value of zero signals end of stream.
+ */
+ size_t ReadLine(TUtf16String& w);
+
+ /**
+ * Skips some data from the stream without reading / copying it. Note that
+ * this function might skip less data than what was requested.
+ *
+ * @param len Number of bytes to skip.
+ * @returns Number of bytes that were actually skipped.
+ * A return value of zero signals end of stream.
+ */
+ size_t Skip(size_t len);
+
+protected:
+ /**
+ * Reads some data from the stream. Might read less data than what was
+ * requested.
+ *
+ * @param buf Buffer to read into.
+ * @param len Number of bytes to read.
+ * @returns Number of bytes that were actually read.
+ * A return value of zero signals end of stream.
+ * @throws yexception If IO error occurs.
+ */
+ virtual size_t DoRead(void* buf, size_t len) = 0;
+
+ /**
+ * Skips some data from the stream. Might skip less data than what was
+ * requested.
+ *
+ * @param len Number of bytes to skip.
+ * @returns Number of bytes that were actually skipped.
+ * A return value of zero signals end of stream.
+ * @throws yexception If IO error occurs.
+ */
+ virtual size_t DoSkip(size_t len);
+
+ /**
+ * Reads all characters from the stream until the given character is
+ * encountered, and stores them into the given string. The character itself
+ * is read from the stream, but not stored in the string.
+ *
+ * Provided string is cleared only if there is data in the stream.
+ *
+ * @param[out] st String to read into.
+ * @param ch Character to stop at.
+ * @returns Total number of characters read from the stream.
+ * A return value of zero signals end of stream.
+ * @throws yexception If IO error occurs.
+ */
+ virtual size_t DoReadTo(TString& st, char ch);
+
+ /**
+ * Reads all data from this stream and writes it into a provided output
+ * stream.
+ *
+ * @param out Output stream to use.
+ * @returns Total number of characters read from
+ * this stream.
+ * @throws yexception If IO error occurs.
+ */
+ virtual ui64 DoReadAll(IOutputStream& out);
+};
+
+/**
+ * Transfers all data from the given input stream into the given output stream.
+ *
+ * @param in Input stream.
+ * @param out Output stream.
+ */
+ui64 TransferData(IInputStream* in, IOutputStream* out);
+
+/**
+ * `operator>>` for `IInputStream` by default delegates to this function.
+ *
+ * Note that while `operator>>` uses overloading (and thus argument-dependent
+ * lookup), `In` uses template specializations. This makes it possible to
+ * have a single `In` declaration, and then just provide specializations in
+ * cpp files, letting the linker figure everything else out. This approach
+ * reduces compilation times.
+ *
+ * However, if the flexibility of overload resolution is needed, then one should
+ * just overload `operator>>`.
+ *
+ * @param in Input stream to read from.
+ * @param[out] value Value to read.
+ * @throws `yexception` on invalid input or end of stream.
+ * @see Out(IOutputStream&, T&)
+ */
+template <typename T>
+void In(IInputStream& in, T& value);
+
+/**
+ * Reads a value from the stream.
+ *
+ * @param in Input stream to read from.
+ * @param[out] value Value to read.
+ * @returns Input stream.
+ * @throws `yexception` on invalid input or end of stream.
+ * @see operator<<(IOutputStream&, T&)
+ */
+template <typename T>
+inline IInputStream& operator>>(IInputStream& in, T& value) {
+ In<T>(in, value);
+ return in;
+}
+
+namespace NPrivate {
+ IInputStream& StdInStream() noexcept;
+}
+
+/**
+ * Standard input stream.
+ */
+#define Cin (::NPrivate::StdInStream())
+
+/** @} */
diff --git a/util/stream/input_ut.cpp b/util/stream/input_ut.cpp
new file mode 100644
index 0000000000..4a93f5458e
--- /dev/null
+++ b/util/stream/input_ut.cpp
@@ -0,0 +1,157 @@
+#include "input.h"
+#include "output.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/file.h>
+#include <util/system/yassert.h>
+
+#ifdef _win_
+ #include <io.h>
+#endif
+
+class TMockStdIn {
+public:
+ TMockStdIn()
+ : StdInCopy_(dup(0))
+ {
+ }
+ ~TMockStdIn() {
+ close(StdInCopy_);
+ }
+
+ template <typename FuncType>
+ void ForInput(const TStringBuf text, const FuncType& func) {
+ TFile tempFile(TFile::Temporary("input_ut"));
+ tempFile.Write(text.data(), text.size());
+ tempFile.FlushData();
+ tempFile.Seek(0, sSet);
+
+ TFileHandle tempFh(tempFile.GetHandle());
+ tempFh.Duplicate2Posix(0);
+ tempFh.Release();
+
+ func();
+ Cin.ReadAll();
+ dup2(StdInCopy_, 0);
+ clearerr(stdin);
+ }
+
+private:
+ int StdInCopy_;
+};
+
+class TNoInput: public IInputStream {
+public:
+ TNoInput(ui64 size)
+ : Size_(size)
+ {
+ }
+
+protected:
+ size_t DoRead(void*, size_t len) override {
+ len = Min(static_cast<ui64>(len), Size_);
+ Size_ -= len;
+ return len;
+ }
+
+private:
+ ui64 Size_;
+};
+
+class TNoOutput: public IOutputStream {
+public:
+ TNoOutput() = default;
+
+protected:
+ void DoWrite(const void*, size_t) override {
+ }
+};
+
+class TSimpleStringInput: public IInputStream {
+public:
+ TSimpleStringInput(const TString& string)
+ : String_(string)
+ {
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override {
+ Y_ASSERT(len != 0);
+
+ if (String_.empty()) {
+ return 0;
+ }
+
+ *static_cast<char*>(buf) = String_[0];
+ String_.remove(0, 1);
+ return 1;
+ }
+
+private:
+ TString String_;
+};
+
+Y_UNIT_TEST_SUITE(TInputTest) {
+ Y_UNIT_TEST(BigTransfer) {
+ ui64 size = 1024ull * 1024ull * 1024ull * 5;
+ TNoInput input(size);
+ TNoOutput output;
+
+ ui64 transferred = TransferData(&input, &output);
+
+ UNIT_ASSERT_VALUES_EQUAL(transferred, size);
+ }
+
+ Y_UNIT_TEST(TestReadTo) {
+ /* This one tests default implementation of ReadTo. */
+
+ TSimpleStringInput in("0123456789abc");
+
+ TString t;
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8);
+ UNIT_ASSERT_VALUES_EQUAL(t, "0123456");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 5);
+ UNIT_ASSERT_VALUES_EQUAL(t, "89abc");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 0);
+ UNIT_ASSERT_VALUES_EQUAL(t, "89abc");
+ }
+
+ Y_UNIT_TEST(TestReadLine) {
+ TSimpleStringInput in("1\n22\n333");
+
+ TString t;
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 2);
+ UNIT_ASSERT_VALUES_EQUAL(t, "1");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 3);
+ UNIT_ASSERT_VALUES_EQUAL(t, "22");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 3);
+ UNIT_ASSERT_VALUES_EQUAL(t, "333");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 0);
+ UNIT_ASSERT_VALUES_EQUAL(t, "333");
+ }
+
+ Y_UNIT_TEST(TestStdInReadTo) {
+ std::pair<std::pair<TStringBuf, char>, TStringBuf> testPairs[] = {
+ {{"", '\n'}, ""},
+ {{"\n", '\n'}, ""},
+ {{"\n\t", '\t'}, "\n"},
+ {{"\t\n", '\n'}, "\t"},
+ {{"a\tb\n", '\t'}, "a"}};
+
+ TMockStdIn stdIn;
+
+ for (const auto& testPair : testPairs) {
+ const TStringBuf text = testPair.first.first;
+ const char delim = testPair.first.second;
+ const TStringBuf expectedValue = testPair.second;
+
+ stdIn.ForInput(text,
+ [=] {
+ TString value;
+ Cin.ReadTo(value, delim);
+ UNIT_ASSERT_VALUES_EQUAL(value, expectedValue);
+ });
+ }
+ }
+}
diff --git a/util/stream/ios_ut.cpp b/util/stream/ios_ut.cpp
new file mode 100644
index 0000000000..139f4296e5
--- /dev/null
+++ b/util/stream/ios_ut.cpp
@@ -0,0 +1,497 @@
+#include "output.h"
+#include "tokenizer.h"
+#include "buffer.h"
+#include "buffered.h"
+#include "walk.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/string/cast.h>
+#include <util/memory/tempbuf.h>
+#include <util/charset/wide.h>
+
+#include <string>
+#include <iostream>
+
+class TStreamsTest: public TTestBase {
+ UNIT_TEST_SUITE(TStreamsTest);
+ UNIT_TEST(TestGenericRead);
+ UNIT_TEST(TestGenericWrite);
+ UNIT_TEST(TestReadLine);
+ UNIT_TEST(TestMemoryStream);
+ UNIT_TEST(TestBufferedIO);
+ UNIT_TEST(TestBufferStream);
+ UNIT_TEST(TestStringStream);
+ UNIT_TEST(TestWtrokaInput);
+ UNIT_TEST(TestStrokaInput);
+ UNIT_TEST(TestReadTo);
+ UNIT_TEST(TestWtrokaOutput);
+ UNIT_TEST(TestIStreamOperators);
+ UNIT_TEST(TestWchar16Output);
+ UNIT_TEST(TestWchar32Output);
+ UNIT_TEST(TestUtf16StingOutputByChars);
+ UNIT_TEST_SUITE_END();
+
+public:
+ void TestGenericRead();
+ void TestGenericWrite();
+ void TestReadLine();
+ void TestMemoryStream();
+ void TestBufferedIO();
+ void TestBufferStream();
+ void TestStringStream();
+ void TestWtrokaInput();
+ void TestStrokaInput();
+ void TestWtrokaOutput();
+ void TestIStreamOperators();
+ void TestReadTo();
+ void TestWchar16Output();
+ void TestWchar32Output();
+ void TestUtf16StingOutputByChars();
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TStreamsTest);
+
+void TStreamsTest::TestIStreamOperators() {
+ TString data("first line\r\nsecond\t\xd1\x82\xd0\xb5\xd1\x81\xd1\x82 line\r\n 1 -4 59 4320000000009999999 c\n -1.5 1e-110");
+ TStringInput si(data);
+
+ TString l1;
+ TString l2;
+ TString l3;
+ TUtf16String w1;
+ TString l4;
+ ui16 i1;
+ i16 i2;
+ i32 i3;
+ ui64 i4;
+ char c1;
+ unsigned char c2;
+ float f1;
+ double f2;
+
+ si >> l1 >> l2 >> l3 >> w1 >> l4 >> i1 >> i2 >> i3 >> i4 >> c1 >> c2 >> f1 >> f2;
+
+ UNIT_ASSERT_EQUAL(l1, "first");
+ UNIT_ASSERT_EQUAL(l2, "line");
+ UNIT_ASSERT_EQUAL(l3, "second");
+ UNIT_ASSERT_EQUAL(l4, "line");
+ UNIT_ASSERT_EQUAL(i1, 1);
+ UNIT_ASSERT_EQUAL(i2, -4);
+ UNIT_ASSERT_EQUAL(i3, 59);
+ UNIT_ASSERT_EQUAL(i4, 4320000000009999999ULL);
+ UNIT_ASSERT_EQUAL(c1, 'c');
+ UNIT_ASSERT_EQUAL(c2, '\n');
+ UNIT_ASSERT_EQUAL(f1, -1.5);
+ UNIT_ASSERT_EQUAL(f2, 1e-110);
+}
+
+void TStreamsTest::TestStringStream() {
+ TStringStream s;
+
+ s << "qw\r\n1234"
+ << "\n"
+ << 34;
+
+ UNIT_ASSERT_EQUAL(s.ReadLine(), "qw");
+ UNIT_ASSERT_EQUAL(s.ReadLine(), "1234");
+
+ s << "\r\n"
+ << 123.1;
+
+ UNIT_ASSERT_EQUAL(s.ReadLine(), "34");
+ UNIT_ASSERT_EQUAL(s.ReadLine(), "123.1");
+
+ UNIT_ASSERT_EQUAL(s.Str(), "qw\r\n1234\n34\r\n123.1");
+
+ // Test stream copying
+ TStringStream sc = s;
+
+ s << "-666-" << 13;
+ sc << "-777-" << 0 << "JackPot";
+
+ UNIT_ASSERT_EQUAL(s.Str(), "qw\r\n1234\n34\r\n123.1-666-13");
+ UNIT_ASSERT_EQUAL(sc.Str(), "qw\r\n1234\n34\r\n123.1-777-0JackPot");
+
+ TStringStream ss;
+ ss = s;
+ s << "... and some trash";
+ UNIT_ASSERT_EQUAL(ss.Str(), "qw\r\n1234\n34\r\n123.1-666-13");
+}
+
+void TStreamsTest::TestGenericRead() {
+ TString s("1234567890");
+ TStringInput si(s);
+ char buf[1024];
+
+ UNIT_ASSERT_EQUAL(si.Read(buf, 6), 6);
+ UNIT_ASSERT_EQUAL(memcmp(buf, "123456", 6), 0);
+ UNIT_ASSERT_EQUAL(si.Read(buf, 6), 4);
+ UNIT_ASSERT_EQUAL(memcmp(buf, "7890", 4), 0);
+}
+
+void TStreamsTest::TestGenericWrite() {
+ TString s;
+ TStringOutput so(s);
+
+ so.Write("123456", 6);
+ so.Write("7890", 4);
+
+ UNIT_ASSERT_EQUAL(s, "1234567890");
+}
+
+void TStreamsTest::TestReadLine() {
+ TString data("1234\r\n5678\nqw");
+ TStringInput si(data);
+
+ UNIT_ASSERT_EQUAL(si.ReadLine(), "1234");
+ UNIT_ASSERT_EQUAL(si.ReadLine(), "5678");
+ UNIT_ASSERT_EQUAL(si.ReadLine(), "qw");
+}
+
+void TStreamsTest::TestMemoryStream() {
+ char buf[1024];
+ TMemoryOutput mo(buf, sizeof(buf));
+ bool ehandled = false;
+
+ try {
+ for (size_t i = 0; i < sizeof(buf) + 1; ++i) {
+ mo.Write(i % 127);
+ }
+ } catch (...) {
+ ehandled = true;
+ }
+
+ UNIT_ASSERT_EQUAL(ehandled, true);
+
+ for (size_t i = 0; i < sizeof(buf); ++i) {
+ UNIT_ASSERT_EQUAL(buf[i], (char)(i % 127));
+ }
+}
+
+class TMyStringOutput: public IOutputStream {
+public:
+ inline TMyStringOutput(TString& s, size_t buflen) noexcept
+ : S_(s)
+ , BufLen_(buflen)
+ {
+ }
+
+ ~TMyStringOutput() override = default;
+
+ void DoWrite(const void* data, size_t len) override {
+ S_.Write(data, len);
+ UNIT_ASSERT(len < BufLen_ || ((len % BufLen_) == 0));
+ }
+
+ void DoWriteV(const TPart* p, size_t count) override {
+ TString s;
+
+ for (size_t i = 0; i < count; ++i) {
+ s.append((const char*)p[i].buf, p[i].len);
+ }
+
+ DoWrite(s.data(), s.size());
+ }
+
+private:
+ TStringOutput S_;
+ const size_t BufLen_;
+};
+
+void TStreamsTest::TestBufferedIO() {
+ TString s;
+
+ {
+ const size_t buflen = 7;
+ TBuffered<TMyStringOutput> bo(buflen, s, buflen);
+
+ for (size_t i = 0; i < 1000; ++i) {
+ TString str(" ");
+ str += ToString(i % 10);
+
+ bo.Write(str.data(), str.size());
+ }
+
+ bo.Finish();
+ }
+
+ UNIT_ASSERT_EQUAL(s.size(), 2000);
+
+ {
+ const size_t buflen = 11;
+ TBuffered<TStringInput> bi(buflen, s);
+
+ for (size_t i = 0; i < 1000; ++i) {
+ TString str(" ");
+ str += ToString(i % 10);
+
+ char buf[3];
+
+ UNIT_ASSERT_EQUAL(bi.Load(buf, 2), 2);
+
+ buf[2] = 0;
+
+ UNIT_ASSERT_EQUAL(str, buf);
+ }
+ }
+
+ s.clear();
+
+ {
+ const size_t buflen = 13;
+ TBuffered<TMyStringOutput> bo(buflen, s, buflen);
+ TString f = "1234567890";
+
+ for (size_t i = 0; i < 10; ++i) {
+ f += f;
+ }
+
+ for (size_t i = 0; i < 1000; ++i) {
+ bo.Write(f.data(), i);
+ }
+
+ bo.Finish();
+ }
+}
+
+void TStreamsTest::TestBufferStream() {
+ TBufferStream stream;
+ TString s = "test";
+
+ stream.Write(s.data(), s.size());
+ char buf[5];
+ size_t bytesRead = stream.Read(buf, 4);
+ UNIT_ASSERT_EQUAL(4, bytesRead);
+ UNIT_ASSERT_EQUAL(0, strncmp(s.data(), buf, 4));
+
+ stream.Write(s.data(), s.size());
+ bytesRead = stream.Read(buf, 2);
+ UNIT_ASSERT_EQUAL(2, bytesRead);
+ UNIT_ASSERT_EQUAL(0, strncmp("te", buf, 2));
+
+ bytesRead = stream.Read(buf, 2);
+ UNIT_ASSERT_EQUAL(2, bytesRead);
+ UNIT_ASSERT_EQUAL(0, strncmp("st", buf, 2));
+
+ bytesRead = stream.Read(buf, 2);
+ UNIT_ASSERT_EQUAL(0, bytesRead);
+}
+
+namespace {
+ class TStringListInput: public IWalkInput {
+ public:
+ TStringListInput(const TVector<TString>& data)
+ : Data_(data)
+ , Index_(0)
+ {
+ }
+
+ protected:
+ size_t DoUnboundedNext(const void** ptr) override {
+ if (Index_ >= Data_.size()) {
+ return 0;
+ }
+
+ const TString& string = Data_[Index_++];
+
+ *ptr = string.data();
+ return string.size();
+ }
+
+ private:
+ const TVector<TString>& Data_;
+ size_t Index_;
+ };
+
+ const char Text[] =
+ // UTF8 encoded "one \ntwo\r\nthree\n\tfour\nfive\n" in russian and ...
+ "один \n"
+ "два\r\n"
+ "три\n"
+ "\tчетыре\n"
+ "пять\n"
+ // ... additional test cases
+ "\r\n"
+ "\n\r" // this char goes to the front of the next string
+ "one two\n"
+ "123\r\n"
+ "\t\r ";
+
+ const char* Expected[] = {
+ // UTF8 encoded "one ", "two", "three", "\tfour", "five" in russian and ...
+ "один ",
+ "два",
+ "три",
+ "\tчетыре",
+ "пять",
+ // ... additional test cases
+ "",
+ "",
+ "\rone two",
+ "123",
+ "\t\r "};
+ void TestStreamReadTo1(IInputStream& input, const char* comment) {
+ TString tmp;
+ input.ReadTo(tmp, 'c');
+ UNIT_ASSERT_VALUES_EQUAL_C(tmp, "111a222b333", comment);
+
+ char tmp2;
+ input.Read(&tmp2, 1);
+ UNIT_ASSERT_VALUES_EQUAL_C(tmp2, '4', comment);
+
+ input.ReadTo(tmp, '6');
+ UNIT_ASSERT_VALUES_EQUAL_C(tmp, "44d555e", comment);
+
+ tmp = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL_C(tmp, "66f", comment);
+ }
+
+ void TestStreamReadTo2(IInputStream& input, const char* comment) {
+ TString s;
+ size_t i = 0;
+ while (input.ReadLine(s)) {
+ UNIT_ASSERT_C(i < Y_ARRAY_SIZE(Expected), comment);
+ UNIT_ASSERT_VALUES_EQUAL_C(s, Expected[i], comment);
+ ++i;
+ }
+ }
+
+ void TestStreamReadTo3(IInputStream& input, const char* comment) {
+ UNIT_ASSERT_VALUES_EQUAL_C(input.ReadLine(), "111a222b333c444d555e666f", comment);
+ }
+
+ void TestStreamReadTo4(IInputStream& input, const char* comment) {
+ UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "one", comment);
+ UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "two", comment);
+ UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "three", comment);
+ }
+
+ void TestStrokaInput(IInputStream& input, const char* comment) {
+ TString line;
+ ui32 i = 0;
+ TInstant start = Now();
+ while (input.ReadLine(line)) {
+ ++i;
+ }
+ Cout << comment << ":" << (Now() - start).SecondsFloat() << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(i, 100000);
+ }
+
+ template <class T>
+ void TestStreamReadTo(const TString& text, T test) {
+ TStringInput is(text);
+ test(is, "TStringInput");
+ TMemoryInput mi(text.data(), text.size());
+ test(mi, "TMemoryInput");
+ TBuffer b(text.data(), text.size());
+ TBufferInput bi(b);
+ test(bi, "TBufferInput");
+ TStringInput slave(text);
+ TBufferedInput bdi(&slave);
+ test(bdi, "TBufferedInput");
+ TVector<TString> lst(1, text);
+ TStringListInput sli(lst);
+ test(sli, "IWalkInput");
+ }
+}
+
+void TStreamsTest::TestReadTo() {
+ TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo1);
+ TestStreamReadTo(Text, TestStreamReadTo2);
+ TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo3);
+ TString withZero = "one";
+ withZero.append('\0').append("two").append('\0').append("three");
+ TestStreamReadTo(withZero, TestStreamReadTo4);
+}
+
+void TStreamsTest::TestStrokaInput() {
+ TString s;
+ for (ui32 i = 0; i < 100000; ++i) {
+ TVector<char> d(i % 1000, 'a');
+ s.append(d.data(), d.size());
+ s.append('\n');
+ }
+ TestStreamReadTo(s, ::TestStrokaInput);
+}
+
+void TStreamsTest::TestWtrokaInput() {
+ const TString s(Text);
+ TStringInput is(s);
+ TUtf16String w;
+ size_t i = 0;
+
+ while (is.ReadLine(w)) {
+ UNIT_ASSERT(i < Y_ARRAY_SIZE(Expected));
+ UNIT_ASSERT_VALUES_EQUAL(w, UTF8ToWide(Expected[i]));
+
+ ++i;
+ }
+}
+
+void TStreamsTest::TestWtrokaOutput() {
+ TString s;
+ TStringOutput os(s);
+ const size_t n = sizeof(Expected) / sizeof(Expected[0]);
+
+ for (size_t i = 0; i < n; ++i) {
+ TUtf16String w = UTF8ToWide(Expected[i]);
+
+ os << w;
+
+ if (i == 1 || i == 5 || i == 8) {
+ os << '\r';
+ }
+
+ if (i < n - 1) {
+ os << '\n';
+ }
+ }
+
+ UNIT_ASSERT(s == Text);
+}
+
+void TStreamsTest::TestWchar16Output() {
+ TString s;
+ TStringOutput os(s);
+ os << wchar16(97); // latin a
+ os << u'\u044E'; // cyrillic ю
+ os << u'я';
+ os << wchar16(0xD801); // high surrogate is printed as replacement character U+FFFD
+ os << u'b';
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "aюя"
+ "\xEF\xBF\xBD"
+ "b");
+}
+
+void TStreamsTest::TestWchar32Output() {
+ TString s;
+ TStringOutput os(s);
+ os << wchar32(97); // latin a
+ os << U'\u044E'; // cyrillic ю
+ os << U'я';
+ os << U'\U0001F600'; // grinning face
+ os << u'b';
+
+ UNIT_ASSERT_VALUES_EQUAL(s, "aюя"
+ "\xF0\x9F\x98\x80"
+ "b");
+}
+
+void TStreamsTest::TestUtf16StingOutputByChars() {
+ TString s = "\xd1\x87\xd0\xb8\xd1\x81\xd1\x82\xd0\xb8\xd1\x87\xd0\xb8\xd1\x81\xd1\x82\xd0\xb8";
+ TUtf16String w = UTF8ToWide(s);
+
+ UNIT_ASSERT_VALUES_EQUAL(w.size(), 10);
+
+ TStringStream stream0;
+ stream0 << w;
+ UNIT_ASSERT_VALUES_EQUAL(stream0.Str(), s);
+
+ TStringStream stream1;
+ for (size_t i = 0; i < 10; i++) {
+ stream1 << w[i];
+ }
+ UNIT_ASSERT_VALUES_EQUAL(stream1.Str(), s);
+}
diff --git a/util/stream/labeled.cpp b/util/stream/labeled.cpp
new file mode 100644
index 0000000000..56a886ca30
--- /dev/null
+++ b/util/stream/labeled.cpp
@@ -0,0 +1 @@
+#include "labeled.h"
diff --git a/util/stream/labeled.h b/util/stream/labeled.h
new file mode 100644
index 0000000000..2cc539d241
--- /dev/null
+++ b/util/stream/labeled.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <util/generic/va_args.h>
+
+/**
+ * Generates an output sequence for the provided expressions that is formatted
+ * as a labeled comma-separated list.
+ *
+ * Example usage:
+ * @code
+ * int a = 1, b = 2, c = 3;
+ * stream << LabeledOutput(a, b, c, a + b + c);
+ * // Outputs "a = 1, b = 2, c = 3, a + b + c = 6"
+ * @endcode
+ */
+#define LabeledOutput(...) "" Y_PASS_VA_ARGS(Y_MAP_ARGS_WITH_LAST(__LABELED_OUTPUT_NONLAST__, __LABELED_OUTPUT_IMPL__, __VA_ARGS__))
+
+#define __LABELED_OUTPUT_IMPL__(x) << #x " = " << (x)
+#define __LABELED_OUTPUT_NONLAST__(x) __LABELED_OUTPUT_IMPL__(x) << ", "
diff --git a/util/stream/labeled_ut.cpp b/util/stream/labeled_ut.cpp
new file mode 100644
index 0000000000..12d0dc5004
--- /dev/null
+++ b/util/stream/labeled_ut.cpp
@@ -0,0 +1,12 @@
+#include "str.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TLabeledOutputTest) {
+ Y_UNIT_TEST(TBasicTest) {
+ TStringStream out;
+ int x = 3;
+ out << LabeledOutput(x, 1, 2, 3 + 4);
+ UNIT_ASSERT_STRINGS_EQUAL(out.Str(), "x = 3, 1 = 1, 2 = 2, 3 + 4 = 7");
+ }
+}
diff --git a/util/stream/length.cpp b/util/stream/length.cpp
new file mode 100644
index 0000000000..9907fe2ac9
--- /dev/null
+++ b/util/stream/length.cpp
@@ -0,0 +1,47 @@
+#include "length.h"
+
+size_t TLengthLimitedInput::DoRead(void* buf, size_t len) {
+ const size_t toRead = (size_t)Min<ui64>(Length_, len);
+ const size_t ret = Slave_->Read(buf, toRead);
+
+ Length_ -= ret;
+
+ return ret;
+}
+
+size_t TLengthLimitedInput::DoSkip(size_t len) {
+ len = (size_t)Min<ui64>((ui64)len, Length_);
+ len = Slave_->Skip(len);
+ Length_ -= len;
+ return len;
+}
+
+size_t TCountingInput::DoRead(void* buf, size_t len) {
+ const size_t ret = Slave_->Read(buf, len);
+ Count_ += ret;
+ return ret;
+}
+
+size_t TCountingInput::DoSkip(size_t len) {
+ const size_t ret = Slave_->Skip(len);
+ Count_ += ret;
+ return ret;
+}
+
+size_t TCountingInput::DoReadTo(TString& st, char ch) {
+ const size_t ret = Slave_->ReadTo(st, ch);
+ Count_ += ret;
+ return ret;
+}
+
+ui64 TCountingInput::DoReadAll(IOutputStream& out) {
+ const ui64 ret = Slave_->ReadAll(out);
+ Count_ += ret;
+ return ret;
+}
+
+void TCountingOutput::DoWrite(const void* buf, size_t len) {
+ Slave_->Write(buf, len);
+
+ Count_ += len;
+}
diff --git a/util/stream/length.h b/util/stream/length.h
new file mode 100644
index 0000000000..4d508ae24d
--- /dev/null
+++ b/util/stream/length.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include "input.h"
+#include "output.h"
+
+#include <util/generic/utility.h>
+
+/**
+ * Proxy input stream that can read a limited number of characters from a slave
+ * stream.
+ *
+ * This can be useful for breaking up the slave stream into small chunks and
+ * treat these as separate streams.
+ */
+class TLengthLimitedInput: public IInputStream {
+public:
+ inline TLengthLimitedInput(IInputStream* slave, ui64 length) noexcept
+ : Slave_(slave)
+ , Length_(length)
+ {
+ }
+
+ ~TLengthLimitedInput() override = default;
+
+ inline ui64 Left() const noexcept {
+ return Length_;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+
+private:
+ IInputStream* Slave_;
+ ui64 Length_;
+};
+
+/**
+ * Proxy input stream that counts the number of characters read.
+ */
+class TCountingInput: public IInputStream {
+public:
+ inline TCountingInput(IInputStream* slave) noexcept
+ : Slave_(slave)
+ , Count_()
+ {
+ }
+
+ ~TCountingInput() override = default;
+
+ /**
+ * \returns The total number of characters read from
+ * this stream.
+ */
+ inline ui64 Counter() const noexcept {
+ return Count_;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+ size_t DoReadTo(TString& st, char ch) override;
+ ui64 DoReadAll(IOutputStream& out) override;
+
+private:
+ IInputStream* Slave_;
+ ui64 Count_;
+};
+
+/**
+ * Proxy output stream that counts the number of characters written.
+ */
+class TCountingOutput: public IOutputStream {
+public:
+ inline TCountingOutput(IOutputStream* slave) noexcept
+ : Slave_(slave)
+ , Count_()
+ {
+ }
+
+ ~TCountingOutput() override = default;
+
+ TCountingOutput(TCountingOutput&&) noexcept = default;
+ TCountingOutput& operator=(TCountingOutput&&) noexcept = default;
+
+ /**
+ * \returns The total number of characters written
+ * into this stream.
+ */
+ inline ui64 Counter() const noexcept {
+ return Count_;
+ }
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+
+private:
+ IOutputStream* Slave_;
+ ui64 Count_;
+};
diff --git a/util/stream/length_ut.cpp b/util/stream/length_ut.cpp
new file mode 100644
index 0000000000..8968448954
--- /dev/null
+++ b/util/stream/length_ut.cpp
@@ -0,0 +1,52 @@
+#include "length.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/string.h>
+
+Y_UNIT_TEST_SUITE(TestLengthIO) {
+ Y_UNIT_TEST(TestLengthLimitedInput) {
+ char buf[16];
+
+ TStringStream s1("abcd");
+ TLengthLimitedInput l1(&s1, 2);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Load(buf, 3), 2);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Read(buf, 1), 0);
+ }
+
+ Y_UNIT_TEST(TestCountingInput) {
+ char buf[16];
+
+ TStringStream s1("abc\ndef\n");
+ TCountingInput l1(&s1);
+
+ TString s;
+ l1.ReadLine(s);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 4);
+
+ l1.Load(buf, 1);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 5);
+
+ l1.Skip(1);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 6);
+
+ l1.ReadLine(s);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 8);
+ }
+
+ Y_UNIT_TEST(TestCountingOutput) {
+ TStringStream s1;
+ TCountingOutput l1(&s1);
+
+ l1.Write('1');
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 1);
+
+ l1.Write(TString("abcd"));
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 5);
+
+ TString buf("aaa");
+ IOutputStream::TPart parts[] = {{buf.data(), buf.size()}, {buf.data(), buf.size()}, {buf.data(), buf.size()}};
+ l1.Write(parts, 3);
+ UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 14);
+ }
+}
diff --git a/util/stream/mem.cpp b/util/stream/mem.cpp
new file mode 100644
index 0000000000..22a3339e27
--- /dev/null
+++ b/util/stream/mem.cpp
@@ -0,0 +1,65 @@
+#include "mem.h"
+
+#include <util/generic/yexception.h>
+
+TMemoryInput::TMemoryInput() noexcept
+ : Buf_(nullptr)
+ , Len_(0)
+{
+}
+
+TMemoryInput::TMemoryInput(const void* buf, size_t len) noexcept
+ : Buf_((const char*)buf)
+ , Len_(len)
+{
+}
+
+TMemoryInput::TMemoryInput(const TStringBuf buf) noexcept
+ : Buf_(buf.data())
+ , Len_(buf.size())
+{
+}
+
+TMemoryInput::~TMemoryInput() = default;
+
+size_t TMemoryInput::DoNext(const void** ptr, size_t len) {
+ len = Min(Len_, len);
+
+ *ptr = Buf_;
+ Len_ -= len;
+ Buf_ += len;
+ return len;
+}
+
+void TMemoryInput::DoUndo(size_t len) {
+ Len_ += len;
+ Buf_ -= len;
+}
+
+TMemoryOutput::~TMemoryOutput() = default;
+
+size_t TMemoryOutput::DoNext(void** ptr) {
+ Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted"));
+ *ptr = Buf_;
+ size_t bufferSize = End_ - Buf_;
+ Buf_ = End_;
+
+ return bufferSize;
+}
+
+void TMemoryOutput::DoUndo(size_t len) {
+ Buf_ -= len;
+}
+
+void TMemoryOutput::DoWrite(const void* buf, size_t len) {
+ char* end = Buf_ + len;
+ Y_ENSURE(end <= End_, TStringBuf("memory output stream exhausted"));
+
+ memcpy(Buf_, buf, len);
+ Buf_ = end;
+}
+
+void TMemoryOutput::DoWriteC(char c) {
+ Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted"));
+ *Buf_++ = c;
+}
diff --git a/util/stream/mem.h b/util/stream/mem.h
new file mode 100644
index 0000000000..18a5d46772
--- /dev/null
+++ b/util/stream/mem.h
@@ -0,0 +1,255 @@
+#pragma once
+
+#include "zerocopy.h"
+#include "zerocopy_output.h"
+
+#include <util/generic/strbuf.h>
+
+/**
+ * @addtogroup Streams_Memory
+ * @{
+ */
+
+/**
+ * Input stream that reads data from a memory block.
+ */
+class TMemoryInput: public IZeroCopyInputFastReadTo {
+public:
+ TMemoryInput() noexcept;
+
+ /**
+ * Constructs a stream that reads from the provided memory block. It's up
+ * to the user to make sure that the memory block doesn't get freed while
+ * this stream is in use.
+ *
+ * @param buf Memory block to use.
+ * @param len Size of the memory block.
+ */
+ TMemoryInput(const void* buf, size_t len) noexcept;
+ explicit TMemoryInput(const TStringBuf buf) noexcept;
+ ~TMemoryInput() override;
+
+ TMemoryInput(const TMemoryInput& other) noexcept
+ : IZeroCopyInputFastReadTo()
+ , Buf_(other.Buf_)
+ , Len_(other.Len_)
+ {
+ }
+
+ TMemoryInput& operator=(const TMemoryInput& other) noexcept {
+ if (this != &other) {
+ Buf_ = other.Buf_;
+ Len_ = other.Len_;
+ }
+
+ return *this;
+ }
+
+ TMemoryInput(TMemoryInput&&) noexcept = default;
+ TMemoryInput& operator=(TMemoryInput&&) noexcept = default;
+
+ /**
+ * Initializes this stream with a new memory block. It's up to the
+ * user to make sure that the memory block doesn't get freed while this
+ * stream is in use.
+ *
+ * @param buf New memory block to use.
+ * @param len Size of the new memory block.
+ */
+ void Reset(const void* buf, size_t len) noexcept {
+ Buf_ = (const char*)buf;
+ Len_ = len;
+ }
+
+ /**
+ * @returns Whether there is more data in the stream.
+ */
+ bool Exhausted() const noexcept {
+ return !Avail();
+ }
+
+ /**
+ * @returns Number of bytes available in the stream.
+ */
+ size_t Avail() const noexcept {
+ return Len_;
+ }
+
+ /**
+ * @returns Current read position in the memory block
+ * used by this stream.
+ */
+ const char* Buf() const noexcept {
+ return Buf_;
+ }
+
+ /**
+ * Initializes this stream with a next chunk extracted from the given zero
+ * copy stream.
+ *
+ * @param stream Zero copy stream to initialize from.
+ */
+ void Fill(IZeroCopyInput* stream) {
+ Len_ = stream->Next(&Buf_);
+ if (!Len_) {
+ Reset(nullptr, 0);
+ }
+ }
+
+private:
+ size_t DoNext(const void** ptr, size_t len) override;
+ void DoUndo(size_t len) override;
+
+private:
+ const char* Buf_;
+ size_t Len_;
+};
+
+/**
+ * Output stream that writes data to a memory block.
+ */
+class TMemoryOutput: public IZeroCopyOutput {
+public:
+ /**
+ * Constructs a stream that writes to the provided memory block. It's up
+ * to the user to make sure that the memory block doesn't get freed while
+ * this stream is in use.
+ *
+ * @param buf Memory block to use.
+ * @param len Size of the memory block.
+ */
+ TMemoryOutput(void* buf, size_t len) noexcept
+ : Buf_(static_cast<char*>(buf))
+ , End_(Buf_ + len)
+ {
+ }
+ ~TMemoryOutput() override;
+
+ TMemoryOutput(TMemoryOutput&&) noexcept = default;
+ TMemoryOutput& operator=(TMemoryOutput&&) noexcept = default;
+
+ /**
+ * Initializes this stream with a new memory block. It's up to the
+ * user to make sure that the memory block doesn't get freed while this
+ * stream is in use.
+ *
+ * @param buf New memory block to use.
+ * @param len Size of the new memory block.
+ */
+ inline void Reset(void* buf, size_t len) noexcept {
+ Buf_ = static_cast<char*>(buf);
+ End_ = Buf_ + len;
+ }
+
+ /**
+ * @returns Whether there is more space in the
+ * stream for writing.
+ */
+ inline bool Exhausted() const noexcept {
+ return !Avail();
+ }
+
+ /**
+ * @returns Number of bytes available for writing
+ * in the stream.
+ */
+ inline size_t Avail() const noexcept {
+ return End_ - Buf_;
+ }
+
+ /**
+ * @returns Current write position in the memory block
+ * used by this stream.
+ */
+ inline char* Buf() const noexcept {
+ return Buf_;
+ }
+
+ /**
+ * @returns Pointer to the end of the memory block
+ * used by this stream.
+ */
+ char* End() const {
+ return End_;
+ }
+
+private:
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t len) override;
+ void DoWrite(const void* buf, size_t len) override;
+ void DoWriteC(char c) override;
+
+protected:
+ char* Buf_;
+ char* End_;
+};
+
+/**
+ * Memory output stream that supports changing the position of the
+ * write pointer.
+ *
+ * @see TMemoryOutput
+ */
+class TMemoryWriteBuffer: public TMemoryOutput {
+public:
+ TMemoryWriteBuffer(void* buf, size_t len)
+ : TMemoryOutput(buf, len)
+ , Beg_(Buf_)
+ {
+ }
+
+ void Reset(void* buf, size_t len) {
+ TMemoryOutput::Reset(buf, len);
+ Beg_ = Buf_;
+ }
+
+ size_t Len() const {
+ return Buf() - Beg();
+ }
+
+ size_t Empty() const {
+ return Buf() == Beg();
+ }
+
+ /**
+ * @returns Data that has been written into this
+ * stream as a string.
+ */
+ TStringBuf Str() const {
+ return TStringBuf(Beg(), Buf());
+ }
+
+ char* Beg() const {
+ return Beg_;
+ }
+
+ /**
+ * @param ptr New write position for this stream.
+ * Must be inside the memory block that
+ * this stream uses.
+ */
+ void SetPos(char* ptr) {
+ Y_ASSERT(Beg_ <= ptr);
+ SetPosImpl(ptr);
+ }
+
+ /**
+ * @param pos New write position for this stream,
+ * relative to the beginning of the memory
+ * block that this stream uses.
+ */
+ void SetPos(size_t pos) {
+ SetPosImpl(Beg_ + pos);
+ }
+
+protected:
+ void SetPosImpl(char* ptr) {
+ Y_ASSERT(End_ >= ptr);
+ Buf_ = ptr;
+ }
+
+protected:
+ char* Beg_;
+};
+
+/** @} */
diff --git a/util/stream/mem_ut.cpp b/util/stream/mem_ut.cpp
new file mode 100644
index 0000000000..f388ae66ac
--- /dev/null
+++ b/util/stream/mem_ut.cpp
@@ -0,0 +1,78 @@
+#include "mem.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TestMemIO) {
+ Y_UNIT_TEST(TestReadTo) {
+ TString s("0123456789abc");
+ TMemoryInput in(s);
+
+ TString t;
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8);
+ UNIT_ASSERT_VALUES_EQUAL(t, "0123456");
+ UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 5);
+ UNIT_ASSERT_VALUES_EQUAL(t, "89abc");
+ }
+
+ Y_UNIT_TEST(NextAndUndo) {
+ char buffer[20];
+ TMemoryOutput output(buffer, sizeof(buffer));
+ char* ptr = nullptr;
+ size_t bufferSize = output.Next(&ptr);
+ UNIT_ASSERT_GE(bufferSize, 1);
+ *ptr = '1';
+ if (bufferSize > 1) {
+ output.Undo(bufferSize - 1);
+ }
+
+ bufferSize = output.Next(&ptr);
+ UNIT_ASSERT_GE(bufferSize, 2);
+ *ptr = '2';
+ *(ptr + 1) = '2';
+ if (bufferSize > 2) {
+ output.Undo(bufferSize - 2);
+ }
+
+ bufferSize = output.Next(&ptr);
+ UNIT_ASSERT_GE(bufferSize, 3);
+ *ptr = '3';
+ *(ptr + 1) = '3';
+ *(ptr + 2) = '3';
+ if (bufferSize > 3) {
+ output.Undo(bufferSize - 3);
+ }
+
+ output.Finish();
+
+ const char* const result = "1"
+ "22"
+ "333";
+ UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result)));
+ }
+
+ Y_UNIT_TEST(Write) {
+ char buffer[20];
+ TMemoryOutput output(buffer, sizeof(buffer));
+ output << "1"
+ << "22"
+ << "333"
+ << "4444"
+ << "55555";
+
+ const char* const result = "1"
+ "22"
+ "333"
+ "4444"
+ "55555";
+ UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result)));
+ }
+
+ Y_UNIT_TEST(WriteChars) {
+ char buffer[20];
+ TMemoryOutput output(buffer, sizeof(buffer));
+ output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0';
+
+ const char* const result = "1234567890";
+ UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result)));
+ }
+}
diff --git a/util/stream/multi.cpp b/util/stream/multi.cpp
new file mode 100644
index 0000000000..b2354298a0
--- /dev/null
+++ b/util/stream/multi.cpp
@@ -0,0 +1,56 @@
+#include "null.h"
+#include "multi.h"
+
+TMultiInput::TMultiInput(IInputStream* f, IInputStream* s) noexcept
+ : C_(f)
+ , N_(s)
+{
+}
+
+TMultiInput::~TMultiInput() = default;
+
+size_t TMultiInput::DoRead(void* buf, size_t len) {
+ const size_t ret = C_->Read(buf, len);
+
+ if (ret) {
+ return ret;
+ }
+
+ C_ = N_;
+ N_ = &Cnull;
+
+ return C_->Read(buf, len);
+}
+
+size_t TMultiInput::DoReadTo(TString& st, char ch) {
+ size_t ret = C_->ReadTo(st, ch);
+ if (ret == st.size() + 1) { // found a symbol, not eof
+ return ret;
+ }
+
+ C_ = N_;
+ N_ = &Cnull;
+
+ if (ret == 0) {
+ ret += C_->ReadTo(st, ch);
+ } else {
+ TString tmp;
+ ret += C_->ReadTo(tmp, ch);
+ st += tmp;
+ }
+
+ return ret;
+}
+
+size_t TMultiInput::DoSkip(size_t len) {
+ const size_t ret = C_->Skip(len);
+
+ if (ret) {
+ return ret;
+ }
+
+ C_ = N_;
+ N_ = &Cnull;
+
+ return C_->Skip(len);
+}
diff --git a/util/stream/multi.h b/util/stream/multi.h
new file mode 100644
index 0000000000..8bfd462d99
--- /dev/null
+++ b/util/stream/multi.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include "input.h"
+
+/**
+ * @addtogroup Streams_Multi
+ * @{
+ */
+
+/**
+ * A proxy input stream that concatenates two slave streams into one.
+ */
+class TMultiInput: public IInputStream {
+public:
+ TMultiInput(IInputStream* f, IInputStream* s) noexcept;
+ ~TMultiInput() override;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+ size_t DoReadTo(TString& st, char ch) override;
+
+private:
+ IInputStream* C_;
+ IInputStream* N_;
+};
+
+/**
+ * See also "util/stream/tee.h" for multi output.
+ */
+
+/** @} */
diff --git a/util/stream/multi_ut.cpp b/util/stream/multi_ut.cpp
new file mode 100644
index 0000000000..fc2553b533
--- /dev/null
+++ b/util/stream/multi_ut.cpp
@@ -0,0 +1,51 @@
+#include "mem.h"
+#include "multi.h"
+#include "str.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TestMultiInput) {
+ struct TTestCase {
+ TMemoryInput Input1;
+ TMemoryInput Input2;
+ TMultiInput MultiInput;
+ TTestCase(const TStringBuf in1, const TStringBuf in2)
+ : Input1(in1)
+ , Input2(in2)
+ , MultiInput(&Input1, &Input2)
+ {
+ }
+ void TestReadToResult(char c, size_t expectedRetval,
+ const TString& expectedValue,
+ const TString& initValue = "") {
+ TString t = initValue;
+ UNIT_ASSERT_VALUES_EQUAL(MultiInput.ReadTo(t, c), expectedRetval);
+ UNIT_ASSERT_VALUES_EQUAL(t, expectedValue);
+ }
+ };
+
+ Y_UNIT_TEST(TestReadTo) {
+ TString t;
+
+ TTestCase simpleCase("0123456789abc", "defghijk");
+ simpleCase.TestReadToResult('7', 8, "0123456");
+ simpleCase.TestReadToResult('f', 8, "89abcde");
+ simpleCase.TestReadToResult('z', 5, "ghijk");
+ }
+
+ Y_UNIT_TEST(TestReadToBetweenStreams) {
+ TTestCase case1("0123456789abc", "defghijk");
+ case1.TestReadToResult('c', 13, "0123456789ab");
+ case1.TestReadToResult('k', 8, "defghij");
+ case1.TestReadToResult('z', 0, "TRASH", "TRASH");
+
+ TTestCase case2("0123456789abc", "defghijk");
+ case2.TestReadToResult('d', 14, "0123456789abc");
+ case2.TestReadToResult('j', 6, "efghi");
+ case2.TestReadToResult('k', 1, "", "TRASH");
+
+ TTestCase case3("0123456789abc", "defghijk");
+ case3.TestReadToResult('e', 15, "0123456789abcd");
+ case3.TestReadToResult('j', 5, "fghi");
+ case3.TestReadToResult('k', 1, "", "TRASH");
+ }
+}
diff --git a/util/stream/null.cpp b/util/stream/null.cpp
new file mode 100644
index 0000000000..4e8b298145
--- /dev/null
+++ b/util/stream/null.cpp
@@ -0,0 +1,36 @@
+#include "null.h"
+
+#include <util/generic/singleton.h>
+
+TNullIO& NPrivate::StdNullStream() noexcept {
+ return *SingletonWithPriority<TNullIO, 4>();
+}
+
+TNullInput::TNullInput() noexcept {
+}
+
+TNullInput::~TNullInput() = default;
+
+size_t TNullInput::DoRead(void*, size_t) {
+ return 0;
+}
+
+size_t TNullInput::DoSkip(size_t) {
+ return 0;
+}
+
+size_t TNullInput::DoNext(const void**, size_t) {
+ return 0;
+}
+
+TNullOutput::TNullOutput() noexcept = default;
+
+TNullOutput::~TNullOutput() = default;
+
+void TNullOutput::DoWrite(const void* /*buf*/, size_t /*len*/) {
+}
+
+TNullIO::TNullIO() noexcept {
+}
+
+TNullIO::~TNullIO() = default;
diff --git a/util/stream/null.h b/util/stream/null.h
new file mode 100644
index 0000000000..8c335a9a78
--- /dev/null
+++ b/util/stream/null.h
@@ -0,0 +1,61 @@
+#pragma once
+
+#include "zerocopy.h"
+#include "output.h"
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Null input stream. Does nothing, contains no data.
+ */
+class TNullInput: public IZeroCopyInput {
+public:
+ TNullInput() noexcept;
+ ~TNullInput() override;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+ size_t DoNext(const void** ptr, size_t len) override;
+};
+
+/**
+ * Null output stream. Just ignores whatever is written into it.
+ */
+class TNullOutput: public IOutputStream {
+public:
+ TNullOutput() noexcept;
+ ~TNullOutput() override;
+
+ TNullOutput(TNullOutput&&) noexcept = default;
+ TNullOutput& operator=(TNullOutput&&) noexcept = default;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+/**
+ * Null input-output stream.
+ *
+ * @see TNullInput
+ * @see TNullOutput
+ */
+class TNullIO: public TNullInput, public TNullOutput {
+public:
+ TNullIO() noexcept;
+ ~TNullIO() override;
+};
+
+namespace NPrivate {
+ TNullIO& StdNullStream() noexcept;
+}
+
+/**
+ * Standard null stream.
+ */
+#define Cnull (::NPrivate::StdNullStream())
+
+/** @} */
diff --git a/util/stream/output.cpp b/util/stream/output.cpp
new file mode 100644
index 0000000000..db81b81b70
--- /dev/null
+++ b/util/stream/output.cpp
@@ -0,0 +1,428 @@
+#include "output.h"
+
+#include <util/string/cast.h>
+#include "format.h"
+#include <util/memory/tempbuf.h>
+#include <util/generic/singleton.h>
+#include <util/generic/yexception.h>
+#include <util/charset/utf8.h>
+#include <util/charset/wide.h>
+
+#if defined(_android_)
+ #include <util/system/dynlib.h>
+ #include <util/system/guard.h>
+ #include <util/system/mutex.h>
+ #include <android/log.h>
+#endif
+
+#include <cerrno>
+#include <string>
+#include <string_view>
+#include <cstdio>
+
+#if defined(_win_)
+ #include <io.h>
+#endif
+
+constexpr size_t MAX_UTF8_BYTES = 4; // UTF-8-encoded code point takes between 1 and 4 bytes
+
+IOutputStream::IOutputStream() noexcept = default;
+
+IOutputStream::~IOutputStream() = default;
+
+void IOutputStream::DoFlush() {
+ /*
+ * do nothing
+ */
+}
+
+void IOutputStream::DoFinish() {
+ Flush();
+}
+
+void IOutputStream::DoWriteV(const TPart* parts, size_t count) {
+ for (size_t i = 0; i < count; ++i) {
+ const TPart& part = parts[i];
+
+ DoWrite(part.buf, part.len);
+ }
+}
+
+void IOutputStream::DoWriteC(char ch) {
+ DoWrite(&ch, 1);
+}
+
+template <>
+void Out<wchar16>(IOutputStream& o, wchar16 ch) {
+ const wchar32 w32ch = ReadSymbol(&ch, &ch + 1);
+ size_t length;
+ unsigned char buffer[MAX_UTF8_BYTES];
+ WriteUTF8Char(w32ch, length, buffer);
+ o.Write(buffer, length);
+}
+
+template <>
+void Out<wchar32>(IOutputStream& o, wchar32 ch) {
+ size_t length;
+ unsigned char buffer[MAX_UTF8_BYTES];
+ WriteUTF8Char(ch, length, buffer);
+ o.Write(buffer, length);
+}
+
+static void WriteString(IOutputStream& o, const wchar16* w, size_t n) {
+ const size_t buflen = (n * MAX_UTF8_BYTES); // * 4 because the conversion functions can convert unicode character into maximum 4 bytes of UTF8
+ TTempBuf buffer(buflen + 1);
+ char* const data = buffer.Data();
+ size_t written = 0;
+ WideToUTF8(w, n, data, written);
+ data[written] = 0;
+ o.Write(data, written);
+}
+
+static void WriteString(IOutputStream& o, const wchar32* w, size_t n) {
+ const size_t buflen = (n * MAX_UTF8_BYTES); // * 4 because the conversion functions can convert unicode character into maximum 4 bytes of UTF8
+ TTempBuf buffer(buflen + 1);
+ char* const data = buffer.Data();
+ size_t written = 0;
+ WideToUTF8(w, n, data, written);
+ data[written] = 0;
+ o.Write(data, written);
+}
+
+template <>
+void Out<TString>(IOutputStream& o, const TString& p) {
+ o.Write(p.data(), p.size());
+}
+
+template <>
+void Out<std::string>(IOutputStream& o, const std::string& p) {
+ o.Write(p.data(), p.length());
+}
+
+template <>
+void Out<std::string_view>(IOutputStream& o, const std::string_view& p) {
+ o.Write(p.data(), p.length());
+}
+
+template <>
+void Out<std::u16string_view>(IOutputStream& o, const std::u16string_view& p) {
+ WriteString(o, p.data(), p.length());
+}
+
+template <>
+void Out<std::u32string_view>(IOutputStream& o, const std::u32string_view& p) {
+ WriteString(o, p.data(), p.length());
+}
+
+template <>
+void Out<TStringBuf>(IOutputStream& o, const TStringBuf& p) {
+ o.Write(p.data(), p.length());
+}
+
+template <>
+void Out<TWtringBuf>(IOutputStream& o, const TWtringBuf& p) {
+ WriteString(o, p.data(), p.length());
+}
+
+template <>
+void Out<TUtf32StringBuf>(IOutputStream& o, const TUtf32StringBuf& p) {
+ WriteString(o, p.data(), p.length());
+}
+
+template <>
+void Out<const wchar16*>(IOutputStream& o, const wchar16* w) {
+ if (w) {
+ WriteString(o, w, std::char_traits<wchar16>::length(w));
+ } else {
+ o.Write("(null)");
+ }
+}
+
+template <>
+void Out<const wchar32*>(IOutputStream& o, const wchar32* w) {
+ if (w) {
+ WriteString(o, w, std::char_traits<wchar32>::length(w));
+ } else {
+ o.Write("(null)");
+ }
+}
+
+template <>
+void Out<TUtf16String>(IOutputStream& o, const TUtf16String& w) {
+ WriteString(o, w.c_str(), w.size());
+}
+
+template <>
+void Out<TUtf32String>(IOutputStream& o, const TUtf32String& w) {
+ WriteString(o, w.c_str(), w.size());
+}
+
+#define DEF_CONV_DEFAULT(type) \
+ template <> \
+ void Out<type>(IOutputStream & o, type p) { \
+ o << ToString(p); \
+ }
+
+#define DEF_CONV_CHR(type) \
+ template <> \
+ void Out<type>(IOutputStream & o, type p) { \
+ o.Write((char)p); \
+ }
+
+#define DEF_CONV_NUM(type, len) \
+ template <> \
+ void Out<type>(IOutputStream & o, type p) { \
+ char buf[len]; \
+ o.Write(buf, ToString(p, buf, sizeof(buf))); \
+ } \
+ \
+ template <> \
+ void Out<volatile type>(IOutputStream & o, volatile type p) { \
+ Out<type>(o, p); \
+ }
+
+DEF_CONV_NUM(bool, 64)
+
+DEF_CONV_CHR(char)
+DEF_CONV_CHR(signed char)
+DEF_CONV_CHR(unsigned char)
+
+DEF_CONV_NUM(signed short, 64)
+DEF_CONV_NUM(signed int, 64)
+DEF_CONV_NUM(signed long int, 64)
+DEF_CONV_NUM(signed long long int, 64)
+
+DEF_CONV_NUM(unsigned short, 64)
+DEF_CONV_NUM(unsigned int, 64)
+DEF_CONV_NUM(unsigned long int, 64)
+DEF_CONV_NUM(unsigned long long int, 64)
+
+DEF_CONV_NUM(float, 512)
+DEF_CONV_NUM(double, 512)
+DEF_CONV_NUM(long double, 512)
+
+#if !defined(_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION) || (_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION == 1)
+// TODO: acknowledge std::bitset::reference for both libc++ and libstdc++
+template <>
+void Out<typename std::vector<bool>::reference>(IOutputStream& o, const std::vector<bool>::reference& bit) {
+ return Out<bool>(o, static_cast<bool>(bit));
+}
+#endif
+
+#ifndef TSTRING_IS_STD_STRING
+template <>
+void Out<TBasicCharRef<TString>>(IOutputStream& o, const TBasicCharRef<TString>& c) {
+ o << static_cast<char>(c);
+}
+
+template <>
+void Out<TBasicCharRef<TUtf16String>>(IOutputStream& o, const TBasicCharRef<TUtf16String>& c) {
+ o << static_cast<wchar16>(c);
+}
+
+template <>
+void Out<TBasicCharRef<TUtf32String>>(IOutputStream& o, const TBasicCharRef<TUtf32String>& c) {
+ o << static_cast<wchar32>(c);
+}
+#endif
+
+template <>
+void Out<const void*>(IOutputStream& o, const void* t) {
+ o << Hex(size_t(t));
+}
+
+template <>
+void Out<void*>(IOutputStream& o, void* t) {
+ Out<const void*>(o, t);
+}
+
+using TNullPtr = decltype(nullptr);
+
+template <>
+void Out<TNullPtr>(IOutputStream& o, TTypeTraits<TNullPtr>::TFuncParam) {
+ o << TStringBuf("nullptr");
+}
+
+#if defined(_android_)
+namespace {
+ class TAndroidStdIOStreams {
+ public:
+ TAndroidStdIOStreams()
+ : LogLibrary("liblog.so")
+ , LogFuncPtr((TLogFuncPtr)LogLibrary.Sym("__android_log_write"))
+ , Out(LogFuncPtr)
+ , Err(LogFuncPtr)
+ {
+ }
+
+ public:
+ using TLogFuncPtr = void (*)(int, const char*, const char*);
+
+ class TAndroidStdOutput: public IOutputStream {
+ public:
+ inline TAndroidStdOutput(TLogFuncPtr logFuncPtr) noexcept
+ : Buffer()
+ , LogFuncPtr(logFuncPtr)
+ {
+ }
+
+ virtual ~TAndroidStdOutput() {
+ }
+
+ private:
+ virtual void DoWrite(const void* buf, size_t len) override {
+ with_lock (BufferMutex) {
+ Buffer.Write(buf, len);
+ }
+ }
+
+ virtual void DoFlush() override {
+ with_lock (BufferMutex) {
+ LogFuncPtr(ANDROID_LOG_DEBUG, GetTag(), Buffer.Data());
+ Buffer.Clear();
+ }
+ }
+
+ virtual const char* GetTag() const = 0;
+
+ private:
+ TMutex BufferMutex;
+ TStringStream Buffer;
+ TLogFuncPtr LogFuncPtr;
+ };
+
+ class TStdErr: public TAndroidStdOutput {
+ public:
+ TStdErr(TLogFuncPtr logFuncPtr)
+ : TAndroidStdOutput(logFuncPtr)
+ {
+ }
+
+ virtual ~TStdErr() {
+ }
+
+ private:
+ virtual const char* GetTag() const override {
+ return "stderr";
+ }
+ };
+
+ class TStdOut: public TAndroidStdOutput {
+ public:
+ TStdOut(TLogFuncPtr logFuncPtr)
+ : TAndroidStdOutput(logFuncPtr)
+ {
+ }
+
+ virtual ~TStdOut() {
+ }
+
+ private:
+ virtual const char* GetTag() const override {
+ return "stdout";
+ }
+ };
+
+ static bool Enabled;
+ TDynamicLibrary LogLibrary; // field order is important, see constructor
+ TLogFuncPtr LogFuncPtr;
+ TStdOut Out;
+ TStdErr Err;
+
+ static inline TAndroidStdIOStreams& Instance() {
+ return *SingletonWithPriority<TAndroidStdIOStreams, 4>();
+ }
+ };
+
+ bool TAndroidStdIOStreams::Enabled = false;
+}
+#endif // _android_
+
+namespace {
+ class TStdOutput: public IOutputStream {
+ public:
+ inline TStdOutput(FILE* f) noexcept
+ : F_(f)
+ {
+ }
+
+ ~TStdOutput() override = default;
+
+ private:
+ void DoWrite(const void* buf, size_t len) override {
+ if (len != fwrite(buf, 1, len, F_)) {
+#if defined(_win_)
+ // On Windows, if 'F_' is console -- 'fwrite' returns count of written characters.
+ // If, for example, console output codepage is UTF-8, then returned value is
+ // not equal to 'len'. So, we ignore some 'errno' values...
+ if ((errno == 0 || errno == EINVAL || errno == EILSEQ) && _isatty(fileno(F_))) {
+ return;
+ }
+#endif
+ ythrow TSystemError() << "write failed";
+ }
+ }
+
+ void DoFlush() override {
+ if (fflush(F_) != 0) {
+ ythrow TSystemError() << "fflush failed";
+ }
+ }
+
+ private:
+ FILE* F_;
+ };
+
+ struct TStdIOStreams {
+ struct TStdErr: public TStdOutput {
+ inline TStdErr()
+ : TStdOutput(stderr)
+ {
+ }
+
+ ~TStdErr() override = default;
+ };
+
+ struct TStdOut: public TStdOutput {
+ inline TStdOut()
+ : TStdOutput(stdout)
+ {
+ }
+
+ ~TStdOut() override = default;
+ };
+
+ TStdOut Out;
+ TStdErr Err;
+
+ static inline TStdIOStreams& Instance() {
+ return *SingletonWithPriority<TStdIOStreams, 4>();
+ }
+ };
+}
+
+IOutputStream& NPrivate::StdErrStream() noexcept {
+#if defined(_android_)
+ if (TAndroidStdIOStreams::Enabled) {
+ return TAndroidStdIOStreams::Instance().Err;
+ }
+#endif
+ return TStdIOStreams::Instance().Err;
+}
+
+IOutputStream& NPrivate::StdOutStream() noexcept {
+#if defined(_android_)
+ if (TAndroidStdIOStreams::Enabled) {
+ return TAndroidStdIOStreams::Instance().Out;
+ }
+#endif
+ return TStdIOStreams::Instance().Out;
+}
+
+void RedirectStdioToAndroidLog(bool redirect) {
+#if defined(_android_)
+ TAndroidStdIOStreams::Enabled = redirect;
+#else
+ Y_UNUSED(redirect);
+#endif
+}
diff --git a/util/stream/output.h b/util/stream/output.h
new file mode 100644
index 0000000000..00eef50b95
--- /dev/null
+++ b/util/stream/output.h
@@ -0,0 +1,304 @@
+#pragma once
+
+#include "fwd.h"
+#include "labeled.h"
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/string.h>
+#include <util/generic/strbuf.h>
+#include <util/generic/typetraits.h>
+
+#include <type_traits>
+
+/**
+ * @addtogroup Streams_Base
+ * @{
+ */
+
+/**
+ * Abstract output stream.
+ */
+class IOutputStream: public TNonCopyable {
+public:
+ /**
+ * Data block for output.
+ */
+ struct TPart {
+ inline TPart(const void* Buf, size_t Len) noexcept
+ : buf(Buf)
+ , len(Len)
+ {
+ }
+
+ inline TPart(const TStringBuf s) noexcept
+ : buf(s.data())
+ , len(s.size())
+ {
+ }
+
+ inline TPart() noexcept
+ : buf(nullptr)
+ , len(0)
+ {
+ }
+
+ inline ~TPart() = default;
+
+ static inline TPart CrLf() noexcept {
+ return TPart("\r\n", 2);
+ }
+
+ const void* buf;
+ size_t len;
+ };
+
+ IOutputStream() noexcept;
+ virtual ~IOutputStream();
+
+ IOutputStream(IOutputStream&&) noexcept {
+ }
+
+ IOutputStream& operator=(IOutputStream&&) noexcept {
+ return *this;
+ };
+
+ /**
+ * Writes into this stream.
+ *
+ * @param buf Data to write.
+ * @param len Number of bytes to write.
+ */
+ inline void Write(const void* buf, size_t len) {
+ if (len) {
+ DoWrite(buf, len);
+ }
+ }
+
+ /**
+ * Writes a string into this stream.
+ *
+ * @param st String to write.
+ */
+ inline void Write(const TStringBuf st) {
+ Write(st.data(), st.size());
+ }
+
+ /**
+ * Writes several data blocks into this stream.
+ *
+ * @param parts Pointer to the start of the data blocks
+ * array.
+ * @param count Number of data blocks to write.
+ */
+ inline void Write(const TPart* parts, size_t count) {
+ if (count > 1) {
+ DoWriteV(parts, count);
+ } else if (count) {
+ DoWrite(parts->buf, parts->len);
+ }
+ }
+
+ /**
+ * Writes a single character into this stream.
+ *
+ * @param ch Character to write.
+ */
+ inline void Write(char ch) {
+ DoWriteC(ch);
+ }
+
+ /**
+ * Flushes this stream's buffer, if any.
+ *
+ * Note that this can also be done with a `Flush` manipulator:
+ * @code
+ * stream << "some string" << Flush;
+ * @endcode
+ */
+ inline void Flush() {
+ DoFlush();
+ }
+
+ /**
+ * Flushes and closes this stream. No more data can be written into a stream
+ * once it's closed.
+ */
+ inline void Finish() {
+ DoFinish();
+ }
+
+protected:
+ /**
+ * Writes into this stream.
+ *
+ * @param buf Data to write.
+ * @param len Number of bytes to write.
+ * @throws yexception If IO error occurs.
+ */
+ virtual void DoWrite(const void* buf, size_t len) = 0;
+
+ /**
+ * Writes several data blocks into this stream.
+ *
+ * @param parts Pointer to the start of the data blocks
+ * array.
+ * @param count Number of data blocks to write.
+ * @throws yexception If IO error occurs.
+ */
+ virtual void DoWriteV(const TPart* parts, size_t count);
+
+ /**
+ * Writes a single character into this stream. Can be overridden with a faster implementation.
+ *
+ * @param ch Character to write.
+ */
+ virtual void DoWriteC(char ch);
+
+ /**
+ * Flushes this stream's buffer, if any.
+ *
+ * @throws yexception If IO error occurs.
+ */
+ virtual void DoFlush();
+
+ /**
+ * Flushes and closes this stream. No more data can be written into a stream
+ * once it's closed.
+ *
+ * @throws yexception If IO error occurs.
+ */
+ virtual void DoFinish();
+};
+
+/**
+ * `operator<<` for `IOutputStream` by default delegates to this function.
+ *
+ * Note that while `operator<<` uses overloading (and thus argument-dependent
+ * lookup), `Out` uses template specializations. This makes it possible to
+ * have a single `Out` declaration, and then just provide specializations in
+ * cpp files, letting the linker figure everything else out. This approach
+ * reduces compilation times.
+ *
+ * However, if the flexibility of overload resolution is needed, then one should
+ * just overload `operator<<`.
+ *
+ * @param out Output stream to write into.
+ * @param value Value to write.
+ */
+template <class T>
+void Out(IOutputStream& out, typename TTypeTraits<T>::TFuncParam value);
+
+#define Y_DECLARE_OUT_SPEC(MODIF, T, stream, value) \
+ template <> \
+ MODIF void Out<T>(IOutputStream & stream, TTypeTraits<T>::TFuncParam value)
+
+template <>
+inline void Out<const char*>(IOutputStream& o, const char* t) {
+ if (t) {
+ o.Write(t);
+ } else {
+ o.Write("(null)");
+ }
+}
+
+template <>
+void Out<const wchar16*>(IOutputStream& o, const wchar16* w);
+
+template <>
+void Out<const wchar32*>(IOutputStream& o, const wchar32* w);
+
+static inline IOutputStream& operator<<(IOutputStream& o, TStreamManipulator m) {
+ m(o);
+
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, const char* t) {
+ Out<const char*>(o, t);
+
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, char* t) {
+ Out<const char*>(o, t);
+
+ return o;
+}
+
+template <class T>
+static inline std::enable_if_t<std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, T t) {
+ Out<T>(o, t);
+
+ return o;
+}
+
+template <class T>
+static inline std::enable_if_t<!std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, const T& t) {
+ Out<T>(o, t);
+
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, const wchar16* t) {
+ Out<const wchar16*>(o, t);
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, wchar16* t) {
+ Out<const wchar16*>(o, t);
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, const wchar32* t) {
+ Out<const wchar32*>(o, t);
+ return o;
+}
+
+static inline IOutputStream& operator<<(IOutputStream& o, wchar32* t) {
+ Out<const wchar32*>(o, t);
+ return o;
+}
+
+namespace NPrivate {
+ IOutputStream& StdOutStream() noexcept;
+ IOutputStream& StdErrStream() noexcept;
+}
+
+/**
+ * Standard output stream.
+ */
+#define Cout (::NPrivate::StdOutStream())
+
+/**
+ * Standard error stream.
+ */
+#define Cerr (::NPrivate::StdErrStream())
+
+/**
+ * Standard log stream.
+ */
+#define Clog Cerr
+
+/**
+ * End-of-line output manipulator, basically the same as `std::endl`.
+ */
+static inline void Endl(IOutputStream& o) {
+ (o << '\n').Flush();
+}
+
+/**
+ * Flushing stream manipulator, basically the same as `std::flush`.
+ */
+static inline void Flush(IOutputStream& o) {
+ o.Flush();
+}
+
+/*
+ * Also see format.h for additional manipulators.
+ */
+
+#include "debug.h"
+
+void RedirectStdioToAndroidLog(bool redirect);
+
+/** @} */
diff --git a/util/stream/output.pxd b/util/stream/output.pxd
new file mode 100644
index 0000000000..2fccc26d9b
--- /dev/null
+++ b/util/stream/output.pxd
@@ -0,0 +1,12 @@
+from util.generic.string cimport TStringBuf
+
+
+cdef extern from "<util/stream/output.h>" nogil:
+ cdef cppclass IOutputStream:
+ IOutputStream()
+ void Flush() except+
+ void Finish() except+
+
+ void WriteChar "Write"(char) except+
+ void WriteBuf "Write"(const TStringBuf) except+
+ void Write(const void*, size_t) except+
diff --git a/util/stream/pipe.cpp b/util/stream/pipe.cpp
new file mode 100644
index 0000000000..51be1934a7
--- /dev/null
+++ b/util/stream/pipe.cpp
@@ -0,0 +1,121 @@
+#include "pipe.h"
+
+#include <util/generic/yexception.h>
+
+#include <cstdio>
+#include <cerrno>
+
+class TPipeBase::TImpl {
+public:
+ inline TImpl(const TString& command, const char* mode)
+ : Pipe_(nullptr)
+ {
+#ifndef _freebsd_
+ if (strcmp(mode, "r+") == 0) {
+ ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD";
+ }
+#endif
+ Pipe_ = ::popen(command.data(), mode);
+ if (Pipe_ == nullptr) {
+ ythrow TSystemError() << "failed to open pipe: " << command.Quote();
+ }
+ }
+
+ inline ~TImpl() {
+ if (Pipe_ != nullptr) {
+ ::pclose(Pipe_);
+ }
+ }
+
+public:
+ FILE* Pipe_;
+};
+
+TPipeBase::TPipeBase(const TString& command, const char* mode)
+ : Impl_(new TImpl(command, mode))
+{
+}
+
+TPipeBase::~TPipeBase() = default;
+
+TPipeInput::TPipeInput(const TString& command)
+ : TPipeBase(command, "r")
+{
+}
+
+size_t TPipeInput::DoRead(void* buf, size_t len) {
+ if (Impl_->Pipe_ == nullptr) {
+ return 0;
+ }
+
+ size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_);
+ if (bytesRead == 0) {
+ int exitStatus = ::pclose(Impl_->Pipe_);
+ Impl_->Pipe_ = nullptr;
+ if (exitStatus == -1) {
+ ythrow TSystemError() << "pclose() failed";
+ } else if (exitStatus != 0) {
+ ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
+ }
+ }
+ return bytesRead;
+}
+
+TPipeOutput::TPipeOutput(const TString& command)
+ : TPipeBase(command, "w")
+{
+}
+
+void TPipeOutput::DoWrite(const void* buf, size_t len) {
+ if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) {
+ ythrow TSystemError() << "fwrite failed";
+ }
+}
+
+void TPipeOutput::Close() {
+ int exitStatus = ::pclose(Impl_->Pipe_);
+ Impl_->Pipe_ = nullptr;
+ if (exitStatus == -1) {
+ ythrow TSystemError() << "pclose() failed";
+ } else if (exitStatus != 0) {
+ ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")";
+ }
+}
+
+TPipedBase::TPipedBase(PIPEHANDLE fd)
+ : Handle_(fd)
+{
+}
+
+TPipedBase::~TPipedBase() {
+ if (Handle_.IsOpen()) {
+ Handle_.Close();
+ }
+}
+
+TPipedInput::TPipedInput(PIPEHANDLE fd)
+ : TPipedBase(fd)
+{
+}
+
+TPipedInput::~TPipedInput() = default;
+
+size_t TPipedInput::DoRead(void* buf, size_t len) {
+ if (!Handle_.IsOpen()) {
+ return 0;
+ }
+ return Handle_.Read(buf, len);
+}
+
+TPipedOutput::TPipedOutput(PIPEHANDLE fd)
+ : TPipedBase(fd)
+{
+}
+
+TPipedOutput::~TPipedOutput() = default;
+
+void TPipedOutput::DoWrite(const void* buf, size_t len) {
+ if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) {
+ ythrow TSystemError() << "pipe writing failed";
+ }
+}
diff --git a/util/stream/pipe.h b/util/stream/pipe.h
new file mode 100644
index 0000000000..18525b9517
--- /dev/null
+++ b/util/stream/pipe.h
@@ -0,0 +1,112 @@
+#pragma once
+
+#include "input.h"
+#include "output.h"
+
+#include <util/system/pipe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+/**
+ * @addtogroup Streams_Pipes
+ * @{
+ */
+
+/**
+ * Base class for starting a process and communicating with it via pipes.
+ */
+class TPipeBase {
+protected:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ * @param mode Data transfer mode for the pipe. Use
+ * "r" for reading and "w" for writing.
+ */
+ TPipeBase(const TString& command, const char* mode);
+ virtual ~TPipeBase();
+
+protected:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Input stream that binds to a standard output stream of a newly started process.
+ *
+ * Note that if the process ends with non-zero exit status, `Read` function will
+ * throw an exception.
+ */
+class TPipeInput: protected TPipeBase, public IInputStream {
+public:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ */
+ TPipeInput(const TString& command);
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+};
+
+/**
+ * Output stream that binds to a standard input stream of a newly started process.
+ *
+ * Note that if the process ends with non-zero exit status, `Close` function will
+ * throw an exception.
+ */
+class TPipeOutput: protected TPipeBase, public IOutputStream {
+public:
+ /**
+ * Starts a new process and opens a pipe.
+ *
+ * @param command Command line to start a process with.
+ */
+ TPipeOutput(const TString& command);
+
+ /**
+ * Waits for the process to terminate and throws an exception if it ended
+ * with a non-zero exit status.
+ */
+ void Close();
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+class TPipedBase {
+protected:
+ TPipedBase(PIPEHANDLE fd);
+ virtual ~TPipedBase();
+
+protected:
+ TPipeHandle Handle_;
+};
+
+/**
+ * Input stream that binds to a standard output stream of an existing process.
+ */
+class TPipedInput: public TPipedBase, public IInputStream {
+public:
+ TPipedInput(PIPEHANDLE fd);
+ ~TPipedInput() override;
+
+private:
+ size_t DoRead(void* buf, size_t len) override;
+};
+
+/**
+ * Output stream that binds to a standard input stream of an existing process.
+ */
+class TPipedOutput: public TPipedBase, public IOutputStream {
+public:
+ TPipedOutput(PIPEHANDLE fd);
+ ~TPipedOutput() override;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+};
+
+/** @} */
diff --git a/util/stream/printf.cpp b/util/stream/printf.cpp
new file mode 100644
index 0000000000..f3eeca7afc
--- /dev/null
+++ b/util/stream/printf.cpp
@@ -0,0 +1,51 @@
+#include "output.h"
+#include "printf.h"
+
+#include <util/generic/scope.h>
+#include <util/memory/tempbuf.h>
+#include <util/generic/yexception.h>
+
+size_t Printf(IOutputStream& out, const char* fmt, ...) {
+ va_list lst;
+ va_start(lst, fmt);
+
+ Y_DEFER {
+ va_end(lst);
+ };
+
+ return Printf(out, fmt, lst);
+}
+
+static inline size_t TryPrintf(void* ptr, size_t len, IOutputStream& out, const char* fmt, va_list params) {
+ va_list lst;
+ va_copy(lst, params);
+ const int ret = vsnprintf((char*)ptr, len, fmt, lst);
+ va_end(lst);
+
+ if (ret < 0) {
+ return len;
+ }
+
+ if ((size_t)ret < len) {
+ out.Write(ptr, (size_t)ret);
+ }
+
+ return (size_t)ret;
+}
+
+size_t Printf(IOutputStream& out, const char* fmt, va_list params) {
+ size_t guess = 0;
+
+ while (true) {
+ TTempBuf tmp(guess);
+ const size_t ret = TryPrintf(tmp.Data(), tmp.Size(), out, fmt, params);
+
+ if (ret < tmp.Size()) {
+ return ret;
+ }
+
+ guess = Max(tmp.Size() * 2, ret + 1);
+ }
+
+ return 0;
+}
diff --git a/util/stream/printf.h b/util/stream/printf.h
new file mode 100644
index 0000000000..1c7ddc0664
--- /dev/null
+++ b/util/stream/printf.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <util/system/compat.h>
+
+class IOutputStream;
+
+/**
+ * Stream-based `printf` function. Prints formatted data into the provided stream.
+ * Works the same way as a standard C `printf`.
+ *
+ * @param out Stream to write into.
+ * @param fmt Format string.
+ * @param ... Additional arguments.
+ */
+size_t Y_PRINTF_FORMAT(2, 3) Printf(IOutputStream& out, const char* fmt, ...);
+
+/**
+ * Stream-based `vprintf` function. Prints formatted data from variable argument
+ * list into the provided stream. Works the same way as a standard C `vprintf`.
+ *
+ * @param out Stream to write into.
+ * @param fmt Format string.
+ * @param params Additional arguments as a variable argument list.
+ */
+size_t Y_PRINTF_FORMAT(2, 0) Printf(IOutputStream& out, const char* fmt, va_list params);
diff --git a/util/stream/printf_ut.cpp b/util/stream/printf_ut.cpp
new file mode 100644
index 0000000000..0eab167062
--- /dev/null
+++ b/util/stream/printf_ut.cpp
@@ -0,0 +1,33 @@
+#include "null.h"
+#include "printf.h"
+#include "str.h"
+
+#include <util/generic/string.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TStreamPrintfTest) {
+ Y_UNIT_TEST(TestPrintf) {
+ TStringStream ss;
+
+ UNIT_ASSERT_EQUAL(Printf(ss, "qw %s %d", "er", 1), 7);
+ UNIT_ASSERT_EQUAL(ss.Str(), "qw er 1");
+ }
+
+#ifdef __GNUC__
+ #pragma GCC diagnostic ignored "-Wformat-zero-length"
+#endif // __GNUC__
+
+ Y_UNIT_TEST(TestZeroString) {
+ UNIT_ASSERT_EQUAL(Printf(Cnull, ""), 0);
+ }
+
+ Y_UNIT_TEST(TestLargePrintf) {
+ TString s = NUnitTest::RandomString(1000000);
+ TStringStream ss;
+
+ Printf(ss, "%s", s.data());
+
+ UNIT_ASSERT_EQUAL(ss.Str(), s);
+ }
+}
diff --git a/util/stream/str.cpp b/util/stream/str.cpp
new file mode 100644
index 0000000000..13f0e8ef28
--- /dev/null
+++ b/util/stream/str.cpp
@@ -0,0 +1,44 @@
+#include "str.h"
+
+static constexpr size_t MIN_BUFFER_GROW_SIZE = 16;
+
+TStringInput::~TStringInput() = default;
+
+size_t TStringInput::DoNext(const void** ptr, size_t len) {
+ len = Min(len, S_->size() - Pos_);
+ *ptr = S_->data() + Pos_;
+ Pos_ += len;
+ return len;
+}
+
+void TStringInput::DoUndo(size_t len) {
+ Y_VERIFY(len <= Pos_);
+ Pos_ -= len;
+}
+
+TStringOutput::~TStringOutput() = default;
+
+size_t TStringOutput::DoNext(void** ptr) {
+ if (S_->size() == S_->capacity()) {
+ S_->reserve(FastClp2(S_->capacity() + MIN_BUFFER_GROW_SIZE));
+ }
+ size_t previousSize = S_->size();
+ ResizeUninitialized(*S_, S_->capacity());
+ *ptr = S_->begin() + previousSize;
+ return S_->size() - previousSize;
+}
+
+void TStringOutput::DoUndo(size_t len) {
+ Y_VERIFY(len <= S_->size(), "trying to undo more bytes than actually written");
+ S_->resize(S_->size() - len);
+}
+
+void TStringOutput::DoWrite(const void* buf, size_t len) {
+ S_->append((const char*)buf, len);
+}
+
+void TStringOutput::DoWriteC(char c) {
+ S_->push_back(c);
+}
+
+TStringStream::~TStringStream() = default;
diff --git a/util/stream/str.h b/util/stream/str.h
new file mode 100644
index 0000000000..028bd572c0
--- /dev/null
+++ b/util/stream/str.h
@@ -0,0 +1,215 @@
+#pragma once
+
+#include "zerocopy.h"
+#include "zerocopy_output.h"
+
+#include <util/generic/string.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/store_policy.h>
+
+/**
+ * @addtogroup Streams_Strings
+ * @{
+ */
+
+/**
+ * Input stream for reading data from a string.
+ */
+class TStringInput: public IZeroCopyInputFastReadTo {
+public:
+ /**
+ * Constructs a string input stream that reads character data from the
+ * provided string.
+ *
+ * Note that this stream keeps a reference to the provided string, so it's
+ * up to the user to make sure that the string doesn't get destroyed while
+ * this stream is in use.
+ *
+ * For reading data from `TStringBuf`s, see `TMemoryInput` (`util/stream/mem.h`).
+ *
+ * @param s String to read from.
+ */
+ inline TStringInput(const TString& s) noexcept
+ : S_(&s)
+ , Pos_(0)
+ {
+ }
+
+ TStringInput(const TString&&) = delete;
+
+ ~TStringInput() override;
+
+ TStringInput(TStringInput&&) noexcept = default;
+ TStringInput& operator=(TStringInput&&) noexcept = default;
+
+ inline void Swap(TStringInput& s) noexcept {
+ DoSwap(S_, s.S_);
+ DoSwap(Pos_, s.Pos_);
+ }
+
+protected:
+ size_t DoNext(const void** ptr, size_t len) override;
+ void DoUndo(size_t len) override;
+
+private:
+ const TString* S_;
+ size_t Pos_;
+
+ friend class TStringStream;
+};
+
+/**
+ * Stream for writing data into a string.
+ */
+class TStringOutput: public IZeroCopyOutput {
+public:
+ /**
+ * Constructs a string output stream that appends character data to the
+ * provided string.
+ *
+ * Note that this stream keeps a reference to the provided string, so it's
+ * up to the user to make sure that the string doesn't get destroyed while
+ * this stream is in use.
+ *
+ * @param s String to append to.
+ */
+ inline TStringOutput(TString& s) noexcept
+ : S_(&s)
+ {
+ }
+
+ TStringOutput(TStringOutput&& s) noexcept = default;
+
+ ~TStringOutput() override;
+
+ /**
+ * @param size Number of additional characters to
+ * reserve in output string.
+ */
+ inline void Reserve(size_t size) {
+ S_->reserve(S_->size() + size);
+ }
+
+ inline void Swap(TStringOutput& s) noexcept {
+ DoSwap(S_, s.S_);
+ }
+
+protected:
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t len) override;
+ void DoWrite(const void* buf, size_t len) override;
+ void DoWriteC(char c) override;
+
+private:
+ TString* S_;
+};
+
+/**
+ * String input/output stream, similar to `std::stringstream`.
+ */
+class TStringStream: private TEmbedPolicy<TString>, public TStringInput, public TStringOutput {
+ using TEmbeddedString = TEmbedPolicy<TString>;
+
+public:
+ inline TStringStream()
+ : TEmbeddedString()
+ , TStringInput(*TEmbeddedString::Ptr())
+ , TStringOutput(*TEmbeddedString::Ptr())
+ {
+ }
+
+ inline TStringStream(const TString& string)
+ : TEmbeddedString(string)
+ , TStringInput(*TEmbeddedString::Ptr())
+ , TStringOutput(*TEmbeddedString::Ptr())
+ {
+ }
+
+ inline TStringStream(const TStringStream& other)
+ : TEmbeddedString(other.Str())
+ , TStringInput(*TEmbeddedString::Ptr())
+ , TStringOutput(*TEmbeddedString::Ptr())
+ {
+ }
+
+ inline TStringStream& operator=(const TStringStream& other) {
+ // All references remain alive, we need to change position only
+ Str() = other.Str();
+ Pos_ = other.Pos_;
+
+ return *this;
+ }
+
+ ~TStringStream() override;
+
+ /**
+ * @returns Whether @c this contains any data
+ */
+ explicit operator bool() const noexcept {
+ return !Empty();
+ }
+
+ /**
+ * @returns String that this stream is writing into.
+ */
+ inline TString& Str() noexcept {
+ return *Ptr();
+ }
+
+ /**
+ * @returns String that this stream is writing into.
+ */
+ inline const TString& Str() const noexcept {
+ return *Ptr();
+ }
+
+ /**
+ * @returns Pointer to the character data contained
+ * in this stream. The data is guaranteed
+ * to be null-terminated.
+ */
+ inline const char* Data() const noexcept {
+ return Ptr()->data();
+ }
+
+ /**
+ * @returns Total number of characters in this
+ * stream. Note that this is not the same
+ * as the total number of characters
+ * available for reading.
+ */
+ inline size_t Size() const noexcept {
+ return Ptr()->size();
+ }
+
+ /**
+ * @returns Whether the string that this stream
+ * operates on is empty.
+ */
+ Y_PURE_FUNCTION inline bool Empty() const noexcept {
+ return Str().empty();
+ }
+
+ using TStringOutput::Reserve;
+
+ /**
+ * Clears the string that this stream operates on and resets the
+ * read/write pointers.
+ */
+ inline void Clear() {
+ Str().clear();
+ Pos_ = 0;
+ }
+
+ // TODO: compatibility with existing code, remove
+
+ Y_PURE_FUNCTION bool empty() const {
+ return Empty();
+ }
+
+ void clear() {
+ Clear();
+ }
+};
+
+/** @} */
diff --git a/util/stream/str.pxd b/util/stream/str.pxd
new file mode 100644
index 0000000000..76dc16a822
--- /dev/null
+++ b/util/stream/str.pxd
@@ -0,0 +1,12 @@
+from util.generic.ptr cimport THolder
+from util.generic.string cimport TString, TStringBuf
+from util.stream.output cimport IOutputStream
+
+
+cdef extern from "<util/stream/str.h>" nogil:
+ cdef cppclass TStringOutput(IOutputStream):
+ TStringOutput() except+
+ TStringOutput(TString&) except+
+ void Reserve(size_t) except+
+
+ctypedef THolder[TStringOutput] TStringOutputPtr
diff --git a/util/stream/str_ut.cpp b/util/stream/str_ut.cpp
new file mode 100644
index 0000000000..fc6b46c31a
--- /dev/null
+++ b/util/stream/str_ut.cpp
@@ -0,0 +1,152 @@
+#include "str.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/typetraits.h>
+
+template <typename T>
+const T ReturnConstTemp();
+
+Y_UNIT_TEST_SUITE(TStringInputOutputTest) {
+ Y_UNIT_TEST(Lvalue) {
+ TString str = "Hello, World!";
+ TStringInput input(str);
+
+ TString result = input.ReadAll();
+
+ UNIT_ASSERT_VALUES_EQUAL(result, str);
+ }
+
+ Y_UNIT_TEST(ConstRef) {
+ TString str = "Hello, World!";
+ const TString& r = str;
+ TStringInput input(r);
+
+ TString result = input.ReadAll();
+
+ UNIT_ASSERT_VALUES_EQUAL(result, str);
+ }
+
+ Y_UNIT_TEST(NonConstRef) {
+ TString str = "Hello, World!";
+ TString& r = str;
+ TStringInput input(r);
+
+ TString result = input.ReadAll();
+
+ UNIT_ASSERT_VALUES_EQUAL(result, str);
+ }
+
+ Y_UNIT_TEST(Transfer) {
+ TString inputString = "some_string";
+ TStringInput input(inputString);
+
+ TString outputString;
+ TStringOutput output(outputString);
+
+ TransferData(&input, &output);
+
+ UNIT_ASSERT_VALUES_EQUAL(inputString, outputString);
+ }
+
+ Y_UNIT_TEST(SkipReadAll) {
+ TString string0 = "All animals are equal, but some animals are more equal than others.";
+
+ TString string1;
+ for (size_t i = 1; i <= string0.size(); i++) {
+ string1 += string0.substr(0, i);
+ }
+
+ TStringInput input0(string1);
+
+ size_t left = 5;
+ while (left > 0) {
+ left -= input0.Skip(left);
+ }
+
+ TString string2 = input0.ReadAll();
+
+ UNIT_ASSERT_VALUES_EQUAL(string2, string1.substr(5));
+ }
+
+ Y_UNIT_TEST(OperatorBool) {
+ TStringStream str;
+ UNIT_ASSERT(!str);
+ str << "data";
+ UNIT_ASSERT(str);
+ str.Clear();
+ UNIT_ASSERT(!str);
+ }
+
+ Y_UNIT_TEST(TestReadTo) {
+ TString s("0123456789abc");
+ TString t;
+
+ TStringInput in0(s);
+ UNIT_ASSERT_VALUES_EQUAL(in0.ReadTo(t, '7'), 8);
+ UNIT_ASSERT_VALUES_EQUAL(t, "0123456");
+ UNIT_ASSERT_VALUES_EQUAL(in0.ReadTo(t, 'z'), 5);
+ UNIT_ASSERT_VALUES_EQUAL(t, "89abc");
+ }
+
+ Y_UNIT_TEST(WriteViaNextAndUndo) {
+ TString str1;
+ TStringOutput output(str1);
+ TString str2;
+
+ for (size_t i = 0; i < 10000; ++i) {
+ str2.push_back('a' + (i % 20));
+ }
+
+ size_t written = 0;
+ void* ptr = nullptr;
+ while (written < str2.size()) {
+ size_t bufferSize = output.Next(&ptr);
+ UNIT_ASSERT(ptr && bufferSize > 0);
+ size_t toWrite = Min(bufferSize, str2.size() - written);
+ memcpy(ptr, str2.begin() + written, toWrite);
+ written += toWrite;
+ if (toWrite < bufferSize) {
+ output.Undo(bufferSize - toWrite);
+ }
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(str1, str2);
+ }
+
+ Y_UNIT_TEST(Write) {
+ TString str;
+ TStringOutput output(str);
+ output << "1"
+ << "22"
+ << "333"
+ << "4444"
+ << "55555";
+
+ UNIT_ASSERT_STRINGS_EQUAL(str, "1"
+ "22"
+ "333"
+ "4444"
+ "55555");
+ }
+
+ Y_UNIT_TEST(WriteChars) {
+ TString str;
+ TStringOutput output(str);
+ output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0';
+
+ UNIT_ASSERT_STRINGS_EQUAL(str, "1234567890");
+ }
+
+ Y_UNIT_TEST(MoveConstructor) {
+ TString str;
+ TStringOutput output1(str);
+ output1 << "foo";
+
+ TStringOutput output2 = std::move(output1);
+ output2 << "bar";
+ UNIT_ASSERT_STRINGS_EQUAL(str, "foobar");
+
+ // Check old stream is in a valid state
+ output1 << "baz";
+ }
+}
diff --git a/util/stream/str_ut.pyx b/util/stream/str_ut.pyx
new file mode 100644
index 0000000000..2ae617303f
--- /dev/null
+++ b/util/stream/str_ut.pyx
@@ -0,0 +1,62 @@
+# cython: c_string_type=str, c_string_encoding=utf8
+
+from cython.operator cimport dereference
+
+from util.generic.ptr cimport THolder
+from util.generic.string cimport TString, TStringBuf
+from util.stream.str cimport TStringOutput, TStringOutputPtr
+
+import unittest
+
+
+class TestStringOutput(unittest.TestCase):
+ def test_ctor1(self):
+ cdef TStringOutput output
+
+ def test_ctor2(self):
+ cdef TString string
+ cdef THolder[TStringOutput] string_output = THolder[TStringOutput](new TStringOutput(string))
+
+ def test_write_char(self):
+ cdef TString string
+ cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string))
+
+ self.assertEqual(string, "")
+ dereference(string_output.Get()).WriteChar('1')
+ self.assertEqual(string, "1")
+ dereference(string_output.Get()).WriteChar('2')
+ self.assertEqual(string, "12")
+ dereference(string_output.Get()).WriteChar('3')
+ self.assertEqual(string, "123")
+
+ def test_write_void(self):
+ cdef TString string
+ cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string))
+
+ self.assertEqual(string, "")
+ dereference(string_output.Get()).Write("1", 1)
+ self.assertEqual(string, "1")
+ dereference(string_output.Get()).Write("2", 1)
+ self.assertEqual(string, "12")
+ dereference(string_output.Get()).Write("34", 2)
+ self.assertEqual(string, "1234")
+
+ def test_write_buf(self):
+ cdef TString string
+ cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string))
+
+ self.assertEqual(string, "")
+ dereference(string_output.Get()).WriteBuf(TStringBuf("1"))
+ self.assertEqual(string, "1")
+ dereference(string_output.Get()).WriteBuf(TStringBuf("2"))
+ self.assertEqual(string, "12")
+ dereference(string_output.Get()).WriteBuf(TStringBuf("34"))
+ self.assertEqual(string, "1234")
+
+ def test_reserve(self):
+ cdef TString string
+ cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string))
+ self.assertEqual(string, "")
+ dereference(string_output.Get()).Reserve(50)
+ self.assertEqual(string, "")
+ self.assertLessEqual(50, string.capacity())
diff --git a/util/stream/tee.cpp b/util/stream/tee.cpp
new file mode 100644
index 0000000000..99873b95ba
--- /dev/null
+++ b/util/stream/tee.cpp
@@ -0,0 +1,24 @@
+#include "tee.h"
+
+TTeeOutput::TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept
+ : L_(l)
+ , R_(r)
+{
+}
+
+TTeeOutput::~TTeeOutput() = default;
+
+void TTeeOutput::DoWrite(const void* buf, size_t len) {
+ L_->Write(buf, len);
+ R_->Write(buf, len);
+}
+
+void TTeeOutput::DoFlush() {
+ L_->Flush();
+ R_->Flush();
+}
+
+void TTeeOutput::DoFinish() {
+ L_->Finish();
+ R_->Finish();
+}
diff --git a/util/stream/tee.h b/util/stream/tee.h
new file mode 100644
index 0000000000..c69e232fb9
--- /dev/null
+++ b/util/stream/tee.h
@@ -0,0 +1,28 @@
+#pragma once
+
+#include "output.h"
+
+/**
+ * @addtogroup Streams_Multi
+ * @{
+ */
+
+/**
+ * A proxy output stream that writes into two slave streams simultaneously.
+ */
+class TTeeOutput: public IOutputStream {
+public:
+ TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept;
+ ~TTeeOutput() override;
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+private:
+ IOutputStream* L_;
+ IOutputStream* R_;
+};
+
+/** @} */
diff --git a/util/stream/tempbuf.cpp b/util/stream/tempbuf.cpp
new file mode 100644
index 0000000000..801a1fabb0
--- /dev/null
+++ b/util/stream/tempbuf.cpp
@@ -0,0 +1,22 @@
+#include "tempbuf.h"
+
+namespace {
+ static inline size_t Next(size_t size) noexcept {
+ return size * 2;
+ }
+}
+
+void TTempBufOutput::DoWrite(const void* data, size_t len) {
+ if (Y_LIKELY(len <= Left())) {
+ Append(data, len);
+ } else {
+ const size_t filled = Filled();
+
+ TTempBuf buf(Next(filled + len));
+
+ buf.Append(Data(), filled);
+ buf.Append(data, len);
+
+ static_cast<TTempBuf&>(*this) = buf;
+ }
+}
diff --git a/util/stream/tempbuf.h b/util/stream/tempbuf.h
new file mode 100644
index 0000000000..a6dc001025
--- /dev/null
+++ b/util/stream/tempbuf.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "output.h"
+
+#include <util/memory/tempbuf.h>
+
+class TTempBufOutput: public IOutputStream, public TTempBuf {
+public:
+ inline TTempBufOutput() = default;
+
+ explicit TTempBufOutput(size_t size)
+ : TTempBuf(size)
+ {
+ }
+
+ TTempBufOutput(TTempBufOutput&&) noexcept = default;
+ TTempBufOutput& operator=(TTempBufOutput&&) noexcept = default;
+
+protected:
+ void DoWrite(const void* data, size_t len) override;
+};
diff --git a/util/stream/tokenizer.cpp b/util/stream/tokenizer.cpp
new file mode 100644
index 0000000000..44e719530a
--- /dev/null
+++ b/util/stream/tokenizer.cpp
@@ -0,0 +1 @@
+#include "tokenizer.h"
diff --git a/util/stream/tokenizer.h b/util/stream/tokenizer.h
new file mode 100644
index 0000000000..b2398efdd1
--- /dev/null
+++ b/util/stream/tokenizer.h
@@ -0,0 +1,214 @@
+#pragma once
+
+#include "input.h"
+
+#include <util/generic/buffer.h>
+#include <util/generic/mem_copy.h>
+#include <util/generic/strbuf.h>
+#include <util/system/compiler.h>
+#include <util/system/yassert.h>
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Simple stream tokenizer. Splits the stream into tokens that are available
+ * via iterator interface.
+ *
+ * @tparam TEndOfToken Predicate for token delimiter characters.
+ * @see TEol
+ */
+template <typename TEndOfToken>
+class TStreamTokenizer {
+public:
+ class TIterator {
+ public:
+ inline TIterator(TStreamTokenizer* const parent)
+ : Parent_(parent)
+ , AtEnd_(!Parent_->Next(Data_, Len_))
+ {
+ }
+
+ inline TIterator() noexcept
+ : Parent_(nullptr)
+ , Data_(nullptr)
+ , Len_(0)
+ , AtEnd_(true)
+ {
+ }
+
+ inline ~TIterator() = default;
+
+ inline void operator++() {
+ Next();
+ }
+
+ inline bool operator==(const TIterator& l) const noexcept {
+ return AtEnd_ == l.AtEnd_;
+ }
+
+ inline bool operator!=(const TIterator& l) const noexcept {
+ return !(*this == l);
+ }
+
+ /**
+ * @return Return null-terminated character array with current token.
+ * The pointer may be invalid after iterator increment.
+ */
+ inline const char* Data() const noexcept {
+ Y_ASSERT(!AtEnd_);
+
+ return Data_;
+ }
+
+ /**
+ * @return Length of current token.
+ */
+ inline size_t Length() const noexcept {
+ Y_ASSERT(!AtEnd_);
+
+ return Len_;
+ }
+
+ inline TIterator* operator->() noexcept {
+ return this;
+ }
+
+ inline TStringBuf operator*() noexcept {
+ return TStringBuf{Data_, Len_};
+ }
+
+ private:
+ inline void Next() {
+ Y_ASSERT(Parent_);
+
+ AtEnd_ = !Parent_->Next(Data_, Len_);
+ }
+
+ private:
+ TStreamTokenizer* const Parent_;
+ char* Data_;
+ size_t Len_;
+ bool AtEnd_;
+ };
+
+ inline TStreamTokenizer(IInputStream* const input, const TEndOfToken& eot = TEndOfToken(),
+ const size_t initial = 1024)
+ : Input_(input)
+ , Buf_(initial)
+ , Cur_(BufBegin())
+ , End_(BufBegin())
+ , Eot_(eot)
+ {
+ CheckBuf();
+ }
+
+ inline bool Next(char*& buf, size_t& len) {
+ char* it = Cur_;
+
+ while (true) {
+ do {
+ while (it != End_) {
+ if (Eot_(*it)) {
+ *it = '\0';
+
+ buf = Cur_;
+ len = it - Cur_;
+ Cur_ = it + 1;
+
+ return true;
+ } else {
+ ++it;
+ }
+ }
+
+ if (Fill() == 0 && End_ != BufEnd()) {
+ *it = '\0';
+
+ buf = Cur_;
+ len = it - Cur_;
+ Cur_ = End_;
+
+ return len;
+ }
+ } while (it != BufEnd());
+
+ Y_ASSERT(it == BufEnd());
+ Y_ASSERT(End_ == BufEnd());
+
+ const size_t blen = End_ - Cur_;
+ if (Cur_ == BufBegin()) {
+ Y_ASSERT(blen == Buf_.Capacity());
+
+ /*
+ * do reallocate
+ */
+
+ Buf_.Reserve(Buf_.Capacity() * 4);
+ CheckBuf();
+ } else {
+ /*
+ * do move
+ */
+
+ MemMove(BufBegin(), Cur_, blen);
+ }
+
+ Cur_ = BufBegin();
+ End_ = Cur_ + blen;
+ it = End_;
+ }
+ }
+
+ inline TIterator begin() {
+ return TIterator{this};
+ }
+
+ inline TIterator end() noexcept {
+ return {};
+ }
+
+private:
+ inline size_t Fill() {
+ const size_t avail = BufEnd() - End_;
+ const size_t bytesRead = Input_->Read(End_, avail);
+
+ End_ += bytesRead;
+
+ return bytesRead;
+ }
+
+ inline char* BufBegin() noexcept {
+ return Buf_.Data();
+ }
+
+ inline char* BufEnd() noexcept {
+ return Buf_.Data() + Buf_.Capacity();
+ }
+
+ inline void CheckBuf() const {
+ if (!Buf_.Data()) {
+ throw std::bad_alloc();
+ }
+ }
+
+private:
+ IInputStream* const Input_;
+ TBuffer Buf_;
+ char* Cur_;
+ char* End_;
+ TEndOfToken Eot_;
+};
+
+/**
+ * Predicate for `TStreamTokenizer` that uses '\\n' as a delimiter.
+ */
+struct TEol {
+ inline bool operator()(char ch) const noexcept {
+ return ch == '\n';
+ }
+};
+
+/** @} */
diff --git a/util/stream/tokenizer_ut.cpp b/util/stream/tokenizer_ut.cpp
new file mode 100644
index 0000000000..afc566da86
--- /dev/null
+++ b/util/stream/tokenizer_ut.cpp
@@ -0,0 +1,264 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/array_size.h>
+#include <util/generic/strbuf.h>
+
+#include "mem.h"
+#include "null.h"
+#include "tokenizer.h"
+
+static inline void CheckIfNullTerminated(const TStringBuf str) {
+ UNIT_ASSERT_VALUES_EQUAL('\0', *(str.data() + str.size()));
+}
+
+Y_UNIT_TEST_SUITE(TStreamTokenizerTests) {
+ Y_UNIT_TEST(EmptyStreamTest) {
+ auto&& input = TNullInput{};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()});
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(0, tokensCount);
+ }
+
+ Y_UNIT_TEST(EmptyTokensTest) {
+ const char data[] = "\n\n";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()});
+ UNIT_ASSERT_VALUES_EQUAL(0, it->Length());
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(2, tokensCount);
+ }
+
+ Y_UNIT_TEST(LastTokenendDoesntSatisfyPredicateTest) {
+ const char data[] = "abc\ndef\nxxxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(FirstTokenIsEmptyTest) {
+ const char data[] = "\ndef\nxxxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf(), TStringBuf("def"), TStringBuf("xxxxxx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(PredicateDoesntMatch) {
+ const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(data, token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(1, tokensCount);
+ }
+
+ Y_UNIT_TEST(SimpleTest) {
+ const char data[] = "qwerty\n1234567890\n";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("qwerty"), TStringBuf("1234567890")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(CustomPredicateTest) {
+ struct TIsVerticalBar {
+ inline bool operator()(const char ch) const noexcept {
+ return '|' == ch;
+ }
+ };
+
+ const char data[] = "abc|def|xxxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TIsVerticalBar>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(CustomPredicateSecondTest) {
+ struct TIsVerticalBar {
+ inline bool operator()(const char ch) const noexcept {
+ return '|' == ch || ',' == ch;
+ }
+ };
+
+ const char data[] = "abc|def|xxxxxx,abc|def|xxxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx"),
+ TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TIsVerticalBar>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(FalsePredicateTest) {
+ struct TAlwaysFalse {
+ inline bool operator()(const char) const noexcept {
+ return false;
+ }
+ };
+
+ const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TAlwaysFalse>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(data, token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(1, tokensCount);
+ }
+
+ Y_UNIT_TEST(TruePredicateTest) {
+ struct TAlwaysTrue {
+ inline bool operator()(const char) const noexcept {
+ return true;
+ }
+ };
+
+ const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TAlwaysTrue>{&input};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()});
+ UNIT_ASSERT_VALUES_EQUAL(0, it->Length());
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(dataSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(FirstTokenHasSizeOfTheBufferTest) {
+ const char data[] = "xxxxx\nxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("xxxxx"), TStringBuf("xx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, tokens[0].size()};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(OnlyTokenHasSizeOfTheBufferTest) {
+ const char data[] = "xxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, dataSize};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(data, token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(1, tokensCount);
+ }
+
+ Y_UNIT_TEST(BufferSizeInitialSizeSmallerThanTokenTest) {
+ const char data[] = "xxxxx\nxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("xxxxx"), TStringBuf("xx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, 1};
+ auto tokensCount = size_t{};
+ for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) {
+ const auto token = TStringBuf{it->Data(), it->Length()};
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+
+ Y_UNIT_TEST(RangeBasedForTest) {
+ const char data[] = "abc\ndef\nxxxxxx";
+ const auto dataSize = Y_ARRAY_SIZE(data) - 1;
+ const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")};
+ const auto tokensSize = Y_ARRAY_SIZE(tokens);
+ auto&& input = TMemoryInput{data, dataSize};
+ auto&& tokenizer = TStreamTokenizer<TEol>{&input};
+ auto tokensCount = size_t{};
+ for (const auto& token : tokenizer) {
+ UNIT_ASSERT(tokensCount < tokensSize);
+ CheckIfNullTerminated(token);
+ UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token);
+ ++tokensCount;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount);
+ }
+}
diff --git a/util/stream/trace.cpp b/util/stream/trace.cpp
new file mode 100644
index 0000000000..f37a0b76db
--- /dev/null
+++ b/util/stream/trace.cpp
@@ -0,0 +1 @@
+#include "trace.h"
diff --git a/util/stream/trace.h b/util/stream/trace.h
new file mode 100644
index 0000000000..e74b6ecf3e
--- /dev/null
+++ b/util/stream/trace.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "debug.h"
+
+/**
+ * Debug level, as set via `DBGOUT` environment variable.
+ */
+enum ETraceLevel: ui8 {
+ TRACE_ERR = 1,
+ TRACE_WARN = 2,
+ TRACE_NOTICE = 3,
+ TRACE_INFO = 4,
+ TRACE_DEBUG = 5,
+ TRACE_DETAIL = 6,
+ TRACE_VERBOSE = 7
+};
+
+#if !defined(NDEBUG) && !defined(Y_ENABLE_TRACE)
+ #define Y_ENABLE_TRACE
+#endif
+
+#ifdef Y_ENABLE_TRACE
+
+ /**
+ * Writes the given data into standard debug stream if current debug level set
+ * via `DBGOUT` environment variable permits it.
+ *
+ * Does nothing in release builds unless `Y_ENABLE_TRACE` is defined.
+ *
+ * Example usage:
+ * @code
+ * Y_DBGTRACE(DEBUG, "Advance from " << node1 << " to " << node2);
+ * @endcode
+ *
+ * @param elevel Debug level of this trace command, e.g.
+ * `WARN` or `DEBUG`. Basically a suffix of
+ * one of the values of `ETraceLevel` enum.
+ * @param args Argument chain to be written out into
+ * standard debug stream, joined with `<<`
+ * operator.
+ * @see ETraceLevel
+ */
+ #define Y_DBGTRACE(elevel, args) Y_DBGTRACE0(int(TRACE_##elevel), args)
+ #define Y_DBGTRACE0(level, args) \
+ do \
+ if ((level) <= StdDbgLevel()) { \
+ StdDbgStream() << args << Endl; \
+ } \
+ while (false)
+
+#else
+
+ #define Y_DBGTRACE(elevel, args) \
+ do { \
+ } while (false)
+ #define Y_DBGTRACE0(level, args) \
+ do { \
+ } while (false)
+
+#endif
diff --git a/util/stream/ut/ya.make b/util/stream/ut/ya.make
new file mode 100644
index 0000000000..f0176dd7b4
--- /dev/null
+++ b/util/stream/ut/ya.make
@@ -0,0 +1,30 @@
+UNITTEST_FOR(util)
+
+OWNER(g:util)
+SUBSCRIBER(g:util-subscribers)
+
+SRCS(
+ stream/aligned_ut.cpp
+ stream/buffer_ut.cpp
+ stream/buffered_ut.cpp
+ stream/direct_io_ut.cpp
+ stream/file_ut.cpp
+ stream/format_ut.cpp
+ stream/hex_ut.cpp
+ stream/input_ut.cpp
+ stream/ios_ut.cpp
+ stream/labeled_ut.cpp
+ stream/length_ut.cpp
+ stream/mem_ut.cpp
+ stream/multi_ut.cpp
+ stream/printf_ut.cpp
+ stream/str_ut.cpp
+ stream/tokenizer_ut.cpp
+ stream/walk_ut.cpp
+ stream/zerocopy_output_ut.cpp
+ stream/zlib_ut.cpp
+)
+
+INCLUDE(${ARCADIA_ROOT}/util/tests/ya_util_tests.inc)
+
+END()
diff --git a/util/stream/walk.cpp b/util/stream/walk.cpp
new file mode 100644
index 0000000000..57dc9ab036
--- /dev/null
+++ b/util/stream/walk.cpp
@@ -0,0 +1,22 @@
+#include "walk.h"
+
+#include <util/generic/string.h>
+
+void IWalkInput::DoUndo(size_t len) {
+ Len_ += len;
+ Buf_ = static_cast<const char*>(Buf_) - len;
+}
+
+size_t IWalkInput::DoNext(const void** ptr, size_t len) {
+ if (!Len_) {
+ Len_ = DoUnboundedNext(&Buf_);
+ }
+
+ len = Min(Len_, len);
+ *ptr = Buf_;
+
+ Buf_ = static_cast<const char*>(Buf_) + len;
+ Len_ -= len;
+
+ return len;
+}
diff --git a/util/stream/walk.h b/util/stream/walk.h
new file mode 100644
index 0000000000..7e62cb44dc
--- /dev/null
+++ b/util/stream/walk.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include "zerocopy.h"
+
+/**
+ * Zero-copy stream that simplifies implementation of derived classes.
+ *
+ * Derived classes must implement `DoUnboundedNext` method.
+ */
+class IWalkInput: public IZeroCopyInputFastReadTo {
+public:
+ IWalkInput()
+ : Buf_(nullptr)
+ , Len_(0)
+ {
+ }
+
+protected:
+ void DoUndo(size_t len) override;
+ size_t DoNext(const void** ptr, size_t len) override;
+
+ /**
+ * Returns the next data chunk from this input stream. There are no
+ * restrictions on the size of the data chunk.
+ *
+ * @param ptr[out] Pointer to the start of the data chunk.
+ * @returns Size of the returned data chunk, in bytes.
+ * Return value of zero signals end of stream.
+ */
+ virtual size_t DoUnboundedNext(const void** ptr) = 0;
+
+private:
+ const void* Buf_;
+ size_t Len_;
+};
diff --git a/util/stream/walk_ut.cpp b/util/stream/walk_ut.cpp
new file mode 100644
index 0000000000..e0a783799f
--- /dev/null
+++ b/util/stream/walk_ut.cpp
@@ -0,0 +1,55 @@
+#include "walk.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+class TStringListInput: public IWalkInput {
+public:
+ TStringListInput(const TVector<TString>& data)
+ : Data_(data)
+ , Index_(0)
+ {
+ }
+
+protected:
+ size_t DoUnboundedNext(const void** ptr) override {
+ if (Index_ >= Data_.size()) {
+ return 0;
+ }
+
+ const TString& string = Data_[Index_++];
+
+ *ptr = string.data();
+ return string.size();
+ }
+
+private:
+ const TVector<TString>& Data_;
+ size_t Index_;
+};
+
+Y_UNIT_TEST_SUITE(TWalkTest) {
+ Y_UNIT_TEST(ReadTo) {
+ TVector<TString> data;
+ data.push_back("111a");
+ data.push_back("222b");
+ data.push_back("333c");
+ data.push_back("444d");
+ data.push_back("555e");
+ data.push_back("666f");
+
+ TStringListInput input(data);
+
+ TString tmp1 = input.ReadTo('c');
+ UNIT_ASSERT_VALUES_EQUAL(tmp1, "111a222b333");
+
+ char tmp2;
+ input.Read(&tmp2, 1);
+ UNIT_ASSERT_VALUES_EQUAL(tmp2, '4');
+
+ TString tmp3 = input.ReadTo('6');
+ UNIT_ASSERT_VALUES_EQUAL(tmp3, "44d555e");
+
+ TString tmp4 = input.ReadAll();
+ UNIT_ASSERT_VALUES_EQUAL(tmp4, "66f");
+ }
+}
diff --git a/util/stream/ya.make b/util/stream/ya.make
new file mode 100644
index 0000000000..79c9498ddd
--- /dev/null
+++ b/util/stream/ya.make
@@ -0,0 +1,6 @@
+OWNER(g:util)
+SUBSCRIBER(g:util-subscribers)
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/util/stream/zerocopy.cpp b/util/stream/zerocopy.cpp
new file mode 100644
index 0000000000..dc2982ad55
--- /dev/null
+++ b/util/stream/zerocopy.cpp
@@ -0,0 +1,60 @@
+#include "zerocopy.h"
+#include "output.h"
+
+IZeroCopyInput::~IZeroCopyInput() = default;
+
+size_t IZeroCopyInput::DoRead(void* buf, size_t len) {
+ const void* ptr;
+ size_t result = DoNext(&ptr, len);
+
+ if (result) {
+ memcpy(buf, ptr, result);
+ }
+
+ return result;
+}
+
+ui64 IZeroCopyInput::DoReadAll(IOutputStream& out) {
+ ui64 result = 0;
+ const void* ptr;
+
+ while (size_t len = Next(&ptr)) {
+ out.Write(ptr, len);
+ result += len;
+ }
+
+ return result;
+}
+
+size_t IZeroCopyInput::DoSkip(size_t len) {
+ const void* ptr;
+
+ return DoNext(&ptr, len);
+}
+
+IZeroCopyInputFastReadTo::~IZeroCopyInputFastReadTo() = default;
+
+size_t IZeroCopyInputFastReadTo::DoReadTo(TString& st, char ch) {
+ const char* ptr;
+ size_t len = Next(&ptr);
+ if (!len) {
+ return 0;
+ }
+ size_t result = 0;
+ st.clear();
+ do {
+ if (const char* pos = (const char*)memchr(ptr, ch, len)) {
+ size_t bytesRead = (pos - ptr) + 1;
+ if (bytesRead > 1) {
+ st.append(ptr, pos);
+ }
+ Undo(len - bytesRead);
+ result += bytesRead;
+ return result;
+ } else {
+ result += len;
+ st.append(ptr, len);
+ }
+ } while (len = Next(&ptr));
+ return result;
+}
diff --git a/util/stream/zerocopy.h b/util/stream/zerocopy.h
new file mode 100644
index 0000000000..3315aa3a51
--- /dev/null
+++ b/util/stream/zerocopy.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include <util/system/yassert.h>
+#include <util/system/defaults.h>
+#include <util/generic/ylimits.h>
+
+#include "input.h"
+
+class IOutputStream;
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Input stream with direct access to the input buffer.
+ *
+ * Derived classes must implement `DoNext` method.
+ */
+class IZeroCopyInput: public IInputStream {
+public:
+ IZeroCopyInput() noexcept = default;
+ ~IZeroCopyInput() override;
+
+ IZeroCopyInput(IZeroCopyInput&&) noexcept = default;
+ IZeroCopyInput& operator=(IZeroCopyInput&&) noexcept = default;
+
+ /**
+ * Returns the next data chunk from this input stream.
+ *
+ * Note that this function is not guaranteed to return the requested number
+ * of bytes, even if they are in fact available in the stream.
+ *
+ * @param ptr[out] Pointer to the start of the data chunk.
+ * @param len[in] Maximal size of the data chunk to be returned, in bytes.
+ * @returns Size of the returned data chunk, in bytes.
+ * Return value of zero signals end of stream.
+ */
+ template <class T>
+ inline size_t Next(T** ptr, size_t len) {
+ Y_ASSERT(ptr);
+
+ return DoNext((const void**)ptr, len);
+ }
+
+ template <class T>
+ inline size_t Next(T** ptr) {
+ return Next(ptr, Max<size_t>());
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+ size_t DoSkip(size_t len) override;
+ ui64 DoReadAll(IOutputStream& out) override;
+ virtual size_t DoNext(const void** ptr, size_t len) = 0;
+};
+
+/**
+* Input stream with direct access to the input buffer and ability to undo read
+*
+* Derived classes must implement `DoUndo` method.
+*/
+class IZeroCopyInputFastReadTo: public IZeroCopyInput {
+public:
+ IZeroCopyInputFastReadTo() noexcept = default;
+ ~IZeroCopyInputFastReadTo() override;
+
+ IZeroCopyInputFastReadTo(IZeroCopyInputFastReadTo&&) noexcept = default;
+ IZeroCopyInputFastReadTo& operator=(IZeroCopyInputFastReadTo&&) noexcept = default;
+
+protected:
+ size_t DoReadTo(TString& st, char ch) override;
+
+private:
+ /**
+ * Undo read.
+ *
+ * Note that this function not check if you try undo more that read. In fact Undo used for undo read in last chunk.
+ *
+ * @param len[in] Bytes to undo.
+ */
+ inline void Undo(size_t len) {
+ if (len) {
+ DoUndo(len);
+ }
+ }
+ virtual void DoUndo(size_t len) = 0;
+};
+
+/** @} */
diff --git a/util/stream/zerocopy_output.cpp b/util/stream/zerocopy_output.cpp
new file mode 100644
index 0000000000..23600ef6e1
--- /dev/null
+++ b/util/stream/zerocopy_output.cpp
@@ -0,0 +1,18 @@
+#include "zerocopy_output.h"
+
+#include <util/generic/utility.h>
+
+void IZeroCopyOutput::DoWrite(const void* buf, size_t len) {
+ void* ptr = nullptr;
+ size_t writtenBytes = 0;
+ while (writtenBytes < len) {
+ size_t bufferSize = DoNext(&ptr);
+ Y_ASSERT(ptr && bufferSize > 0);
+ size_t toWrite = Min(bufferSize, len - writtenBytes);
+ memcpy(ptr, static_cast<const char*>(buf) + writtenBytes, toWrite);
+ writtenBytes += toWrite;
+ if (toWrite < bufferSize) {
+ DoUndo(bufferSize - toWrite);
+ }
+ }
+}
diff --git a/util/stream/zerocopy_output.h b/util/stream/zerocopy_output.h
new file mode 100644
index 0000000000..a388be04b0
--- /dev/null
+++ b/util/stream/zerocopy_output.h
@@ -0,0 +1,57 @@
+#pragma once
+
+#include <util/system/yassert.h>
+
+#include "output.h"
+
+/**
+ * @addtogroup Streams
+ * @{
+ */
+
+/**
+ * Output stream with direct access to the output buffer.
+ *
+ * Derived classes must implement `DoNext` and `DoUndo` methods.
+ */
+class IZeroCopyOutput: public IOutputStream {
+public:
+ IZeroCopyOutput() noexcept = default;
+ ~IZeroCopyOutput() override = default;
+
+ IZeroCopyOutput(IZeroCopyOutput&&) noexcept = default;
+ IZeroCopyOutput& operator=(IZeroCopyOutput&&) noexcept = default;
+
+ /**
+ * Returns the next buffer to write to from this output stream.
+ *
+ * @param ptr[out] Pointer to the start of the buffer.
+ * @returns Size of the returned buffer, in bytes.
+ * Return value is always nonzero.
+ */
+ template <class T>
+ inline size_t Next(T** ptr) {
+ Y_ASSERT(ptr);
+
+ return DoNext((void**)ptr);
+ }
+
+ /**
+ * Tells the stream that `len` bytes at the end of the buffer returned previously
+ * by Next were actually not written so the current position in stream must be moved backwards.
+ * `len` must not be greater than the size of the buffer previously returned by `Next`.
+ *
+ * @param len[in] Number of bytes at the end to move the position by.
+ *
+ */
+ inline void Undo(size_t len) {
+ return DoUndo(len);
+ }
+
+protected:
+ void DoWrite(const void* buf, size_t len) override;
+ virtual size_t DoNext(void** ptr) = 0;
+ virtual void DoUndo(size_t len) = 0;
+};
+
+/** @} */
diff --git a/util/stream/zerocopy_output_ut.cpp b/util/stream/zerocopy_output_ut.cpp
new file mode 100644
index 0000000000..e81f7fb056
--- /dev/null
+++ b/util/stream/zerocopy_output_ut.cpp
@@ -0,0 +1,48 @@
+#include "zerocopy_output.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/string.h>
+
+// This version of string output stream is written here only
+// for testing IZeroCopyOutput implementation of DoWrite.
+class TSimpleStringOutput: public IZeroCopyOutput {
+public:
+ TSimpleStringOutput(TString& s) noexcept
+ : S_(s)
+ {
+ }
+
+private:
+ size_t DoNext(void** ptr) override {
+ if (S_.size() == S_.capacity()) {
+ S_.reserve(FastClp2(S_.capacity() + 1));
+ }
+ size_t previousSize = S_.size();
+ S_.resize(S_.capacity());
+ *ptr = S_.begin() + previousSize;
+ return S_.size() - previousSize;
+ }
+
+ void DoUndo(size_t len) override {
+ Y_ENSURE(len <= S_.size());
+ S_.resize(S_.size() - len);
+ }
+
+ TString& S_;
+};
+
+Y_UNIT_TEST_SUITE(TestZerocopyOutput) {
+ Y_UNIT_TEST(Write) {
+ TString str;
+ TSimpleStringOutput output(str);
+ TString result;
+
+ for (size_t i = 0; i < 1000; ++i) {
+ result.push_back('a' + (i % 20));
+ }
+
+ output.Write(result.begin(), result.size());
+ UNIT_ASSERT_STRINGS_EQUAL(str, result);
+ }
+}
diff --git a/util/stream/zlib.cpp b/util/stream/zlib.cpp
new file mode 100644
index 0000000000..60f4e9439f
--- /dev/null
+++ b/util/stream/zlib.cpp
@@ -0,0 +1,380 @@
+#include "zlib.h"
+
+#include <util/memory/addstorage.h>
+#include <util/generic/scope.h>
+#include <util/generic/utility.h>
+
+#include <contrib/libs/zlib/zlib.h>
+
+#include <cstdio>
+#include <cstring>
+
+namespace {
+ static const int opts[] = {
+ //Auto
+ 15 + 32,
+ //ZLib
+ 15 + 0,
+ //GZip
+ 15 + 16,
+ //Raw
+ -15};
+
+ class TZLibCommon {
+ public:
+ inline TZLibCommon() noexcept {
+ memset(Z(), 0, sizeof(*Z()));
+ }
+
+ inline ~TZLibCommon() = default;
+
+ inline const char* GetErrMsg() const noexcept {
+ return Z()->msg != nullptr ? Z()->msg : "unknown error";
+ }
+
+ inline z_stream* Z() const noexcept {
+ return (z_stream*)(&Z_);
+ }
+
+ private:
+ z_stream Z_;
+ };
+
+ static inline ui32 MaxPortion(size_t s) noexcept {
+ return (ui32)Min<size_t>(Max<ui32>(), s);
+ }
+
+ struct TChunkedZeroCopyInput {
+ inline TChunkedZeroCopyInput(IZeroCopyInput* in)
+ : In(in)
+ , Buf(nullptr)
+ , Len(0)
+ {
+ }
+
+ template <class P, class T>
+ inline bool Next(P** buf, T* len) {
+ if (!Len) {
+ Len = In->Next(&Buf);
+ if (!Len) {
+ return false;
+ }
+ }
+
+ const T toread = (T)Min((size_t)Max<T>(), Len);
+
+ *len = toread;
+ *buf = (P*)Buf;
+
+ Buf += toread;
+ Len -= toread;
+
+ return true;
+ }
+
+ IZeroCopyInput* In;
+ const char* Buf;
+ size_t Len;
+ };
+}
+
+class TZLibDecompress::TImpl: private TZLibCommon, public TChunkedZeroCopyInput {
+public:
+ inline TImpl(IZeroCopyInput* in, ZLib::StreamType type, TStringBuf dict)
+ : TChunkedZeroCopyInput(in)
+ , Dict(dict)
+ {
+ if (inflateInit2(Z(), opts[type]) != Z_OK) {
+ ythrow TZLibDecompressorError() << "can not init inflate engine";
+ }
+
+ if (dict.size() && type == ZLib::Raw) {
+ SetDict();
+ }
+ }
+
+ virtual ~TImpl() {
+ inflateEnd(Z());
+ }
+
+ void SetAllowMultipleStreams(bool allowMultipleStreams) {
+ AllowMultipleStreams_ = allowMultipleStreams;
+ }
+
+ inline size_t Read(void* buf, size_t size) {
+ Z()->next_out = (unsigned char*)buf;
+ Z()->avail_out = size;
+
+ while (true) {
+ if (Z()->avail_in == 0) {
+ if (!FillInputBuffer()) {
+ return 0;
+ }
+ }
+
+ switch (inflate(Z(), Z_SYNC_FLUSH)) {
+ case Z_NEED_DICT: {
+ SetDict();
+ continue;
+ }
+
+ case Z_STREAM_END: {
+ if (AllowMultipleStreams_) {
+ if (inflateReset(Z()) != Z_OK) {
+ ythrow TZLibDecompressorError() << "inflate reset error(" << GetErrMsg() << ")";
+ }
+ } else {
+ return size - Z()->avail_out;
+ }
+
+ [[fallthrough]];
+ }
+
+ case Z_OK: {
+ const size_t processed = size - Z()->avail_out;
+
+ if (processed) {
+ return processed;
+ }
+
+ break;
+ }
+
+ default:
+ ythrow TZLibDecompressorError() << "inflate error(" << GetErrMsg() << ")";
+ }
+ }
+ }
+
+private:
+ inline bool FillInputBuffer() {
+ return Next(&Z()->next_in, &Z()->avail_in);
+ }
+
+ void SetDict() {
+ if (inflateSetDictionary(Z(), (const Bytef*)Dict.data(), Dict.size()) != Z_OK) {
+ ythrow TZLibCompressorError() << "can not set inflate dictionary";
+ }
+ }
+
+ bool AllowMultipleStreams_ = true;
+ TStringBuf Dict;
+};
+
+namespace {
+ class TDecompressStream: public IZeroCopyInput, public TZLibDecompress::TImpl, public TAdditionalStorage<TDecompressStream> {
+ public:
+ inline TDecompressStream(IInputStream* input, ZLib::StreamType type, TStringBuf dict)
+ : TZLibDecompress::TImpl(this, type, dict)
+ , Stream_(input)
+ {
+ }
+
+ ~TDecompressStream() override = default;
+
+ private:
+ size_t DoNext(const void** ptr, size_t len) override {
+ void* buf = AdditionalData();
+
+ *ptr = buf;
+ return Stream_->Read(buf, Min(len, AdditionalDataLength()));
+ }
+
+ private:
+ IInputStream* Stream_;
+ };
+
+ using TZeroCopyDecompress = TZLibDecompress::TImpl;
+}
+
+class TZLibCompress::TImpl: public TAdditionalStorage<TImpl>, private TZLibCommon {
+ static inline ZLib::StreamType Type(ZLib::StreamType type) {
+ if (type == ZLib::Auto) {
+ return ZLib::ZLib;
+ }
+
+ if (type >= ZLib::Invalid) {
+ ythrow TZLibError() << "invalid compression type: " << static_cast<unsigned long>(type);
+ }
+
+ return type;
+ }
+
+public:
+ inline TImpl(const TParams& p)
+ : Stream_(p.Out)
+ {
+ if (deflateInit2(Z(), Min<size_t>(9, p.CompressionLevel), Z_DEFLATED, opts[Type(p.Type)], 8, Z_DEFAULT_STRATEGY)) {
+ ythrow TZLibCompressorError() << "can not init inflate engine";
+ }
+
+ // Create exactly the same files on all platforms by fixing OS field in the header.
+ if (p.Type == ZLib::GZip) {
+ GZHeader_ = MakeHolder<gz_header>();
+ GZHeader_->os = 3; // UNIX
+ deflateSetHeader(Z(), GZHeader_.Get());
+ }
+
+ if (p.Dict.size()) {
+ if (deflateSetDictionary(Z(), (const Bytef*)p.Dict.data(), p.Dict.size())) {
+ ythrow TZLibCompressorError() << "can not set deflate dictionary";
+ }
+ }
+
+ Z()->next_out = TmpBuf();
+ Z()->avail_out = TmpBufLen();
+ }
+
+ inline ~TImpl() {
+ deflateEnd(Z());
+ }
+
+ inline void Write(const void* buf, size_t size) {
+ const Bytef* b = (const Bytef*)buf;
+ const Bytef* e = b + size;
+
+ Y_DEFER {
+ Z()->next_in = nullptr;
+ Z()->avail_in = 0;
+ };
+ do {
+ b = WritePart(b, e);
+ } while (b < e);
+ }
+
+ inline const Bytef* WritePart(const Bytef* b, const Bytef* e) {
+ Z()->next_in = const_cast<Bytef*>(b);
+ Z()->avail_in = MaxPortion(e - b);
+
+ while (Z()->avail_in) {
+ const int ret = deflate(Z(), Z_NO_FLUSH);
+
+ switch (ret) {
+ case Z_OK:
+ continue;
+
+ case Z_BUF_ERROR:
+ FlushBuffer();
+
+ break;
+
+ default:
+ ythrow TZLibCompressorError() << "deflate error(" << GetErrMsg() << ")";
+ }
+ }
+
+ return Z()->next_in;
+ }
+
+ inline void Flush() {
+ int ret = deflate(Z(), Z_SYNC_FLUSH);
+
+ while ((ret == Z_OK || ret == Z_BUF_ERROR) && !Z()->avail_out) {
+ FlushBuffer();
+ ret = deflate(Z(), Z_SYNC_FLUSH);
+ }
+
+ if (ret != Z_OK && ret != Z_BUF_ERROR) {
+ ythrow TZLibCompressorError() << "deflate flush error(" << GetErrMsg() << ")";
+ }
+
+ if (Z()->avail_out < TmpBufLen()) {
+ FlushBuffer();
+ }
+ }
+
+ inline void FlushBuffer() {
+ Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out);
+ Z()->next_out = TmpBuf();
+ Z()->avail_out = TmpBufLen();
+ }
+
+ inline void Finish() {
+ int ret = deflate(Z(), Z_FINISH);
+
+ while (ret == Z_OK || ret == Z_BUF_ERROR) {
+ FlushBuffer();
+ ret = deflate(Z(), Z_FINISH);
+ }
+
+ if (ret == Z_STREAM_END) {
+ Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out);
+ } else {
+ ythrow TZLibCompressorError() << "deflate finish error(" << GetErrMsg() << ")";
+ }
+ }
+
+private:
+ inline unsigned char* TmpBuf() noexcept {
+ return (unsigned char*)AdditionalData();
+ }
+
+ inline size_t TmpBufLen() const noexcept {
+ return AdditionalDataLength();
+ }
+
+private:
+ IOutputStream* Stream_;
+ THolder<gz_header> GZHeader_;
+};
+
+TZLibDecompress::TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type, TStringBuf dict)
+ : Impl_(new TZeroCopyDecompress(input, type, dict))
+{
+}
+
+TZLibDecompress::TZLibDecompress(IInputStream* input, ZLib::StreamType type, size_t buflen, TStringBuf dict)
+ : Impl_(new (buflen) TDecompressStream(input, type, dict))
+{
+}
+
+void TZLibDecompress::SetAllowMultipleStreams(bool allowMultipleStreams) {
+ Impl_->SetAllowMultipleStreams(allowMultipleStreams);
+}
+
+TZLibDecompress::~TZLibDecompress() = default;
+
+size_t TZLibDecompress::DoRead(void* buf, size_t size) {
+ return Impl_->Read(buf, MaxPortion(size));
+}
+
+void TZLibCompress::Init(const TParams& params) {
+ Y_ENSURE(params.BufLen >= 16, "ZLib buffer too small");
+ Impl_.Reset(new (params.BufLen) TImpl(params));
+}
+
+void TZLibCompress::TDestruct::Destroy(TImpl* impl) {
+ delete impl;
+}
+
+TZLibCompress::~TZLibCompress() {
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
+void TZLibCompress::DoWrite(const void* buf, size_t size) {
+ if (!Impl_) {
+ ythrow TZLibCompressorError() << "can not write to finished zlib stream";
+ }
+
+ Impl_->Write(buf, size);
+}
+
+void TZLibCompress::DoFlush() {
+ if (Impl_) {
+ Impl_->Flush();
+ }
+}
+
+void TZLibCompress::DoFinish() {
+ THolder<TImpl> impl(Impl_.Release());
+
+ if (impl) {
+ impl->Finish();
+ }
+}
+
+TBufferedZLibDecompress::~TBufferedZLibDecompress() = default;
diff --git a/util/stream/zlib.h b/util/stream/zlib.h
new file mode 100644
index 0000000000..e7de7c81b7
--- /dev/null
+++ b/util/stream/zlib.h
@@ -0,0 +1,173 @@
+#pragma once
+
+#include "fwd.h"
+#include "input.h"
+#include "output.h"
+#include "buffered.h"
+
+#include <util/system/defaults.h>
+#include <util/generic/ptr.h>
+#include <util/generic/yexception.h>
+
+/**
+ * @addtogroup Streams_Archs
+ * @{
+ */
+
+struct TZLibError: public yexception {
+};
+
+struct TZLibCompressorError: public TZLibError {
+};
+
+struct TZLibDecompressorError: public TZLibError {
+};
+
+namespace ZLib {
+ enum StreamType: ui8 {
+ Auto = 0, /**< Auto detect format. Can be used for decompression only. */
+ ZLib = 1,
+ GZip = 2,
+ Raw = 3,
+ Invalid = 4
+ };
+
+ enum {
+ ZLIB_BUF_LEN = 8 * 1024
+ };
+}
+
+/**
+ * Non-buffered ZLib decompressing stream.
+ *
+ * Please don't use `TZLibDecompress` if you read text data from stream using
+ * `ReadLine`, it is VERY slow (approx 10 times slower, according to synthetic
+ * benchmark). For fast buffered ZLib stream reading use `TBufferedZLibDecompress`
+ * aka `TZDecompress`.
+ */
+class TZLibDecompress: public IInputStream {
+public:
+ TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type = ZLib::Auto, TStringBuf dict = {});
+ TZLibDecompress(IInputStream* input, ZLib::StreamType type = ZLib::Auto, size_t buflen = ZLib::ZLIB_BUF_LEN,
+ TStringBuf dict = {});
+
+ /**
+ * Allows/disallows multiple sequential compressed streams. Allowed by default.
+ *
+ * If multiple streams are allowed, their decompressed content will be concatenated.
+ * If multiple streams are disabled, then only first stream is decompressed. After that end
+ * of IInputStream will have happen, i.e. method Read() will return 0.
+ *
+ * @param allowMultipleStreams - flag to allow (true) or disable (false) multiple streams.
+ */
+ void SetAllowMultipleStreams(bool allowMultipleStreams);
+
+ ~TZLibDecompress() override;
+
+protected:
+ size_t DoRead(void* buf, size_t size) override;
+
+public:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+/**
+ * Non-buffered ZLib compressing stream.
+ */
+class TZLibCompress: public IOutputStream {
+public:
+ struct TParams {
+ inline TParams(IOutputStream* out)
+ : Out(out)
+ , Type(ZLib::ZLib)
+ , CompressionLevel(6)
+ , BufLen(ZLib::ZLIB_BUF_LEN)
+ {
+ }
+
+ inline TParams& SetType(ZLib::StreamType type) noexcept {
+ Type = type;
+
+ return *this;
+ }
+
+ inline TParams& SetCompressionLevel(size_t level) noexcept {
+ CompressionLevel = level;
+
+ return *this;
+ }
+
+ inline TParams& SetBufLen(size_t buflen) noexcept {
+ BufLen = buflen;
+
+ return *this;
+ }
+
+ inline TParams& SetDict(const TStringBuf dict) noexcept {
+ Dict = dict;
+
+ return *this;
+ }
+
+ IOutputStream* Out;
+ ZLib::StreamType Type;
+ size_t CompressionLevel;
+ size_t BufLen;
+ TStringBuf Dict;
+ };
+
+ inline TZLibCompress(const TParams& params) {
+ Init(params);
+ }
+
+ inline TZLibCompress(IOutputStream* out, ZLib::StreamType type) {
+ Init(TParams(out).SetType(type));
+ }
+
+ inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level) {
+ Init(TParams(out).SetType(type).SetCompressionLevel(compression_level));
+ }
+
+ inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level, size_t buflen) {
+ Init(TParams(out).SetType(type).SetCompressionLevel(compression_level).SetBufLen(buflen));
+ }
+
+ ~TZLibCompress() override;
+
+private:
+ void Init(const TParams& opts);
+
+ void DoWrite(const void* buf, size_t size) override;
+ void DoFlush() override;
+ void DoFinish() override;
+
+public:
+ class TImpl;
+
+ /** To allow inline constructors. */
+ struct TDestruct {
+ static void Destroy(TImpl* impl);
+ };
+
+ THolder<TImpl, TDestruct> Impl_;
+};
+
+/**
+ * Buffered ZLib decompressing stream.
+ *
+ * Supports efficient `ReadLine` calls and similar "reading in small pieces"
+ * usage patterns.
+ */
+class TBufferedZLibDecompress: public TBuffered<TZLibDecompress> {
+public:
+ template <class T>
+ inline TBufferedZLibDecompress(T* in, ZLib::StreamType type = ZLib::Auto, size_t buf = 1 << 13)
+ : TBuffered<TZLibDecompress>(buf, in, type)
+ {
+ }
+
+ ~TBufferedZLibDecompress() override;
+};
+
+/** @} */
diff --git a/util/stream/zlib_ut.cpp b/util/stream/zlib_ut.cpp
new file mode 100644
index 0000000000..2290b4a9de
--- /dev/null
+++ b/util/stream/zlib_ut.cpp
@@ -0,0 +1,230 @@
+#include "zlib.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "file.h"
+#include <util/system/tempfile.h>
+#include <util/random/entropy.h>
+#include <util/random/random.h>
+
+#define ZDATA "./zdata"
+
+class TThrowingStream: public IOutputStream {
+public:
+ TThrowingStream(int limit)
+ : Limit_(limit)
+ {
+ }
+
+ void DoWrite(const void*, size_t size) override {
+ if (Ignore) {
+ return;
+ }
+
+ Limit_ -= size;
+ if (Limit_ < 0) {
+ throw yexception() << "catch this";
+ }
+ }
+
+ void DoFinish() override {
+ if (Ignore) {
+ return;
+ }
+ if (Limit_ < 0) {
+ throw yexception() << "catch this";
+ }
+ }
+
+ void DoFlush() override {
+ if (Ignore) {
+ return;
+ }
+ if (Limit_ < 0) {
+ throw yexception() << "catch this";
+ }
+ }
+
+ bool Ignore = false;
+
+private:
+ int Limit_;
+};
+
+Y_UNIT_TEST_SUITE(TZLibTest) {
+ static const TString DATA = "8s7d5vc6s5vc67sa4c65ascx6asd4xcv76adsfxv76s";
+ static const TString DATA2 = "cn8wk2bd9vb3vdfif83g1ks94bfiovtwv";
+
+ Y_UNIT_TEST(Compress) {
+ TUnbufferedFileOutput o(ZDATA);
+ TZLibCompress c(&o, ZLib::ZLib);
+
+ c.Write(DATA.data(), DATA.size());
+ c.Finish();
+ o.Finish();
+ }
+
+ Y_UNIT_TEST(Decompress) {
+ TTempFile tmpFile(ZDATA);
+
+ {
+ TUnbufferedFileInput i(ZDATA);
+ TZLibDecompress d(&i);
+
+ UNIT_ASSERT_EQUAL(d.ReadAll(), DATA);
+ }
+ }
+
+ Y_UNIT_TEST(Dictionary) {
+ static constexpr TStringBuf data = "<html><body></body></html>";
+ static constexpr TStringBuf dict = "</<html><body>";
+ for (auto type : {ZLib::Raw, ZLib::ZLib}) {
+ TStringStream compressed;
+ {
+ TZLibCompress compressor(TZLibCompress::TParams(&compressed).SetDict(dict).SetType(type));
+ compressor.Write(data);
+ }
+
+ TZLibDecompress decompressor(&compressed, type, ZLib::ZLIB_BUF_LEN, dict);
+ UNIT_ASSERT_STRINGS_EQUAL(decompressor.ReadAll(), data);
+ }
+ }
+
+ Y_UNIT_TEST(DecompressTwoStreams) {
+ // Check that Decompress(Compress(X) + Compress(Y)) == X + Y
+ TTempFile tmpFile(ZDATA);
+ {
+ TUnbufferedFileOutput o(ZDATA);
+ TZLibCompress c1(&o, ZLib::ZLib);
+ c1.Write(DATA.data(), DATA.size());
+ c1.Finish();
+ TZLibCompress c2(&o, ZLib::ZLib);
+ c2.Write(DATA2.data(), DATA2.size());
+ c2.Finish();
+ o.Finish();
+ }
+ {
+ TUnbufferedFileInput i(ZDATA);
+ TZLibDecompress d(&i);
+
+ UNIT_ASSERT_EQUAL(d.ReadAll(), DATA + DATA2);
+ }
+ }
+
+ Y_UNIT_TEST(CompressionExceptionSegfault) {
+ TVector<char> buf(512 * 1024);
+ EntropyPool().Load(buf.data(), buf.size());
+
+ TThrowingStream o(128 * 1024);
+ TZLibCompress c(&o, ZLib::GZip, 4, 1 << 15);
+ try {
+ c.Write(buf.data(), buf.size());
+ } catch (...) {
+ }
+
+ o.Ignore = true;
+ TVector<char>().swap(buf);
+ }
+
+ Y_UNIT_TEST(DecompressFirstOfTwoStreams) {
+ // Check that Decompress(Compress(X) + Compress(Y)) == X when single stream is allowed
+ TTempFile tmpFile(ZDATA);
+ {
+ TUnbufferedFileOutput o(ZDATA);
+ TZLibCompress c1(&o, ZLib::ZLib);
+ c1.Write(DATA.data(), DATA.size());
+ c1.Finish();
+ TZLibCompress c2(&o, ZLib::ZLib);
+ c2.Write(DATA2.data(), DATA2.size());
+ c2.Finish();
+ o.Finish();
+ }
+ {
+ TUnbufferedFileInput i(ZDATA);
+ TZLibDecompress d(&i);
+ d.SetAllowMultipleStreams(false);
+
+ UNIT_ASSERT_EQUAL(d.ReadAll(), DATA);
+ }
+ }
+
+ Y_UNIT_TEST(CompressFlush) {
+ TString data = "";
+
+ for (size_t i = 0; i < 32; ++i) {
+ TTempFile tmpFile(ZDATA);
+
+ TUnbufferedFileOutput output(ZDATA);
+ TZLibCompress compressor(&output, ZLib::ZLib);
+
+ compressor.Write(data.data(), data.size());
+ compressor.Flush();
+
+ {
+ TUnbufferedFileInput input(ZDATA);
+ TZLibDecompress decompressor(&input);
+
+ UNIT_ASSERT_EQUAL(decompressor.ReadAll(), data);
+ }
+
+ data += 'A' + i;
+ }
+ }
+
+ Y_UNIT_TEST(CompressEmptyFlush) {
+ TTempFile tmpFile(ZDATA);
+
+ TUnbufferedFileOutput output(ZDATA);
+ TZLibCompress compressor(&output, ZLib::ZLib);
+
+ TUnbufferedFileInput input(ZDATA);
+
+ compressor.Write(DATA.data(), DATA.size());
+ compressor.Flush();
+
+ {
+ TZLibDecompress decompressor(&input);
+ UNIT_ASSERT_EQUAL(decompressor.ReadAll(), DATA);
+ }
+
+ for (size_t i = 0; i < 10; ++i) {
+ compressor.Flush();
+ }
+
+ UNIT_ASSERT_EQUAL(input.ReadAll(), "");
+ }
+
+ Y_UNIT_TEST(CompressFlushSmallBuffer) {
+ for (size_t bufferSize = 16; bufferSize < 32; ++bufferSize) {
+ TString firstData = "";
+
+ for (size_t firstDataSize = 0; firstDataSize < 16; ++firstDataSize) {
+ TString secondData = "";
+
+ for (size_t secondDataSize = 0; secondDataSize < 16; ++secondDataSize) {
+ TTempFile tmpFile(ZDATA);
+
+ TUnbufferedFileOutput output(ZDATA);
+ TZLibCompress compressor(TZLibCompress::TParams(&output).SetType(ZLib::ZLib).SetBufLen(bufferSize));
+
+ TUnbufferedFileInput input(ZDATA);
+ TZLibDecompress decompressor(&input);
+
+ compressor.Write(firstData.data(), firstData.size());
+ compressor.Flush();
+
+ UNIT_ASSERT_EQUAL(decompressor.ReadAll(), firstData);
+
+ compressor.Write(secondData.data(), secondData.size());
+ compressor.Flush();
+
+ UNIT_ASSERT_EQUAL(decompressor.ReadAll(), secondData);
+
+ secondData += 'A' + secondDataSize;
+ }
+
+ firstData += 'A' + firstDataSize;
+ }
+ }
+ }
+}