diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/stream | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/stream')
80 files changed, 8384 insertions, 0 deletions
diff --git a/util/stream/aligned.cpp b/util/stream/aligned.cpp new file mode 100644 index 0000000000..2fd12d15b7 --- /dev/null +++ b/util/stream/aligned.cpp @@ -0,0 +1,30 @@ +#include "aligned.h" + +size_t TAlignedInput::DoRead(void* ptr, size_t len) { + size_t ret = Stream_->Read(ptr, len); + Position_ += ret; + return ret; +} + +size_t TAlignedInput::DoSkip(size_t len) { + size_t ret = Stream_->Skip(len); + Position_ += ret; + return ret; +} + +size_t TAlignedInput::DoReadTo(TString& st, char ch) { + size_t ret = Stream_->ReadTo(st, ch); + Position_ += ret; + return ret; +} + +ui64 TAlignedInput::DoReadAll(IOutputStream& out) { + ui64 ret = Stream_->ReadAll(out); + Position_ += ret; + return ret; +} + +void TAlignedOutput::DoWrite(const void* ptr, size_t len) { + Stream_->Write(ptr, len); + Position_ += len; +} diff --git a/util/stream/aligned.h b/util/stream/aligned.h new file mode 100644 index 0000000000..70e7be05a9 --- /dev/null +++ b/util/stream/aligned.h @@ -0,0 +1,99 @@ +#pragma once + +#include "input.h" +#include "output.h" + +#include <util/system/yassert.h> +#include <util/generic/bitops.h> + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Proxy input stream that provides additional functions that make reading + * aligned data easier. + */ +class TAlignedInput: public IInputStream { +public: + TAlignedInput(IInputStream* s) + : Stream_(s) + , Position_(0) + { + } + + /** + * Ensures alignment of the position in the input stream by skipping + * some input. + * + * @param alignment Alignment. Must be a power of 2. + */ + void Align(size_t alignment = sizeof(void*)) { + Y_ASSERT(IsPowerOf2(alignment)); + + if (Position_ & (alignment - 1)) { + size_t len = alignment - (Position_ & (alignment - 1)); + + do { + len -= DoSkip(len); + } while (len); + } + } + +private: + size_t DoRead(void* ptr, size_t len) override; + size_t DoSkip(size_t len) override; + size_t DoReadTo(TString& st, char ch) override; + ui64 DoReadAll(IOutputStream& out) override; + +private: + IInputStream* Stream_; + ui64 Position_; +}; + +/** + * Proxy output stream that provides additional functions that make writing + * aligned data easier. + */ +class TAlignedOutput: public IOutputStream { +public: + TAlignedOutput(IOutputStream* s) + : Stream_(s) + , Position_(0) + { + } + + TAlignedOutput(TAlignedOutput&&) noexcept = default; + TAlignedOutput& operator=(TAlignedOutput&&) noexcept = default; + + size_t GetCurrentOffset() const { + return Position_; + } + + /** + * Ensures alignment of the position in the output stream by writing + * some data. + * + * @param alignment Alignment. Must be a power of 2. + */ + void Align(size_t alignment = sizeof(void*)) { + Y_ASSERT(IsPowerOf2(alignment)); + + static char unused[sizeof(void*) * 2]; + Y_ASSERT(alignment <= sizeof(unused)); + + if (Position_ & (alignment - 1)) { + DoWrite(unused, alignment - (Position_ & (alignment - 1))); + } + } + +private: + void DoWrite(const void* ptr, size_t len) override; + +private: + IOutputStream* Stream_; + ui64 Position_; +}; + +/** @} */ diff --git a/util/stream/aligned_ut.cpp b/util/stream/aligned_ut.cpp new file mode 100644 index 0000000000..e980d05cf7 --- /dev/null +++ b/util/stream/aligned_ut.cpp @@ -0,0 +1,63 @@ +#include "aligned.h" + +#include <library/cpp/testing/unittest/registar.h> + +class TNastyInputStream: public IInputStream { +public: + TNastyInputStream() + : Pos_(0) + { + } + +protected: + size_t DoRead(void* buf, size_t len) override { + if (len == 0) { + return 0; + } + + *static_cast<unsigned char*>(buf) = static_cast<unsigned char>(Pos_); + ++Pos_; + return 1; + } + + size_t DoSkip(size_t len) override { + if (len == 0) { + return 0; + } + + ++Pos_; + return 1; + } + +private: + size_t Pos_; +}; + +Y_UNIT_TEST_SUITE(TAlignedTest) { + Y_UNIT_TEST(AlignInput) { + TNastyInputStream input0; + TAlignedInput alignedInput(&input0); + + char c = '\1'; + + alignedInput.Align(2); + alignedInput.ReadChar(c); + UNIT_ASSERT_VALUES_EQUAL(c, '\x0'); + + alignedInput.Align(2); + alignedInput.ReadChar(c); + UNIT_ASSERT_VALUES_EQUAL(c, '\x2'); + + alignedInput.Align(4); + alignedInput.ReadChar(c); + UNIT_ASSERT_VALUES_EQUAL(c, '\x4'); + + alignedInput.Align(16); + alignedInput.ReadChar(c); + UNIT_ASSERT_VALUES_EQUAL(c, '\x10'); + + alignedInput.Align(128); + alignedInput.ReadChar(c); + UNIT_ASSERT_VALUES_EQUAL(c, '\x80'); + } +} diff --git a/util/stream/buffer.cpp b/util/stream/buffer.cpp new file mode 100644 index 0000000000..2facece4ea --- /dev/null +++ b/util/stream/buffer.cpp @@ -0,0 +1,120 @@ +#include "buffer.h" +#include <util/generic/buffer.h> +#include <util/generic/yexception.h> + +class TBufferOutput::TImpl { +public: + inline TImpl(TBuffer& buf) + : Data_(buf) + { + } + + virtual ~TImpl() = default; + + inline size_t DoNext(void** ptr) { + if (Data_.Avail() == 0) { + Data_.Reserve(FastClp2(Data_.Capacity() + MinBufferGrowSize)); + } + size_t previousSize = Data_.size(); + Data_.Resize(Data_.Capacity()); + *ptr = Data_.Begin() + previousSize; + return Data_.Size() - previousSize; + } + + inline void DoUndo(size_t len) { + Y_VERIFY(len <= Data_.Size(), "trying to undo more bytes than actually written"); + Data_.Resize(Data_.size() - len); + } + + inline void DoWrite(const void* buf, size_t len) { + Data_.Append((const char*)buf, len); + } + + inline void DoWriteC(char c) { + Data_.Append(c); + } + + inline TBuffer& Buffer() const noexcept { + return Data_; + } + +private: + TBuffer& Data_; + static constexpr size_t MinBufferGrowSize = 16; +}; + +namespace { + using TImpl = TBufferOutput::TImpl; + + class TOwnedImpl: private TBuffer, public TImpl { + public: + inline TOwnedImpl(size_t buflen) + : TBuffer(buflen) + , TImpl(static_cast<TBuffer&>(*this)) + { + } + }; +} + +TBufferOutput::TBufferOutput(size_t buflen) + : Impl_(new TOwnedImpl(buflen)) +{ +} + +TBufferOutput::TBufferOutput(TBuffer& buffer) + : Impl_(new TImpl(buffer)) +{ +} + +TBufferOutput::TBufferOutput(TBufferOutput&&) noexcept = default; +TBufferOutput& TBufferOutput::operator=(TBufferOutput&&) noexcept = default; + +TBufferOutput::~TBufferOutput() = default; + +TBuffer& TBufferOutput::Buffer() const noexcept { + return Impl_->Buffer(); +} + +size_t TBufferOutput::DoNext(void** ptr) { + return Impl_->DoNext(ptr); +} + +void TBufferOutput::DoUndo(size_t len) { + Impl_->DoUndo(len); +} + +void TBufferOutput::DoWrite(const void* buf, size_t len) { + Impl_->DoWrite(buf, len); +} + +void TBufferOutput::DoWriteC(char c) { + Impl_->DoWriteC(c); +} + +TBufferInput::TBufferInput(const TBuffer& buffer) + : Buf_(buffer) + , Readed_(0) +{ +} + +TBufferInput::~TBufferInput() = default; + +const TBuffer& TBufferInput::Buffer() const noexcept { + return Buf_; +} + +void TBufferInput::Rewind() noexcept { + Readed_ = 0; +} + +size_t TBufferInput::DoNext(const void** ptr, size_t len) { + len = Min(Buf_.Size() - Readed_, len); + *ptr = Buf_.data() + Readed_; + Readed_ += len; + return len; +} + +void TBufferInput::DoUndo(size_t len) { + Y_VERIFY(len <= Readed_); + Readed_ -= len; +} diff --git a/util/stream/buffer.h b/util/stream/buffer.h new file mode 100644 index 0000000000..9dc99dbe49 --- /dev/null +++ b/util/stream/buffer.h @@ -0,0 +1,119 @@ +#pragma once + +#include "zerocopy.h" +#include "zerocopy_output.h" + +#include <util/generic/ptr.h> + +class TBuffer; + +/** + * @addtogroup Streams_Buffers + * @{ + */ + +/** + * Output stream that writes into a `TBuffer`. + */ +class TBufferOutput: public IZeroCopyOutput { +public: + class TImpl; + + /** + * Constructs a stream that writes into an internal buffer. + * + * @param buflen Initial size of the internal buffer. + */ + TBufferOutput(size_t buflen = 1024); + + /** + * Constructs a stream that writes into the provided buffer. It's up to the + * user to make sure that the buffer doesn't get destroyed while this stream + * is in use. + * + * @param buffer Buffer to write into. + */ + TBufferOutput(TBuffer& buffer); + + TBufferOutput(TBufferOutput&&) noexcept; + TBufferOutput& operator=(TBufferOutput&&) noexcept; + + ~TBufferOutput() override; + + /** + * @returns Buffer that this stream writes into. + */ + TBuffer& Buffer() const noexcept; + +private: + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; + void DoWrite(const void* buf, size_t len) override; + void DoWriteC(char c) override; + +private: + THolder<TImpl> Impl_; +}; + +/** + * Input stream that reads from an external `TBuffer`. + */ +class TBufferInput: public IZeroCopyInputFastReadTo { +public: + /** + * Constructs a stream that reads from an external buffer. It's up to the + * user to make sure that the buffer doesn't get destroyed before this + * stream. + * + * @param buffer External buffer to read from. + */ + TBufferInput(const TBuffer& buffer); + + ~TBufferInput() override; + + const TBuffer& Buffer() const noexcept; + + void Rewind() noexcept; + +protected: + size_t DoNext(const void** ptr, size_t len) override; + void DoUndo(size_t len) override; + +private: + const TBuffer& Buf_; + size_t Readed_; +}; + +/** + * Input/output stream that works with a `TBuffer`. + */ +class TBufferStream: public TBufferOutput, public TBufferInput { +public: + /** + * Constructs a stream that works with an internal buffer. + * + * @param buflen Initial size of the internal buffer. + */ + inline TBufferStream(size_t buflen = 1024) + : TBufferOutput(buflen) + , TBufferInput(TBufferOutput::Buffer()) + { + } + + /** + * Constructs a stream that works with the provided buffer. + * + * @param buffer Buffer to work with. + */ + inline TBufferStream(TBuffer& buffer) + : TBufferOutput(buffer) + , TBufferInput(TBufferOutput::Buffer()) + { + } + + ~TBufferStream() override = default; + + using TBufferOutput::Buffer; +}; + +/** @} */ diff --git a/util/stream/buffer_ut.cpp b/util/stream/buffer_ut.cpp new file mode 100644 index 0000000000..3494696190 --- /dev/null +++ b/util/stream/buffer_ut.cpp @@ -0,0 +1,85 @@ +#include "buffer.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/buffer.h> + +#include <cstring> + +#include "str.h" + +Y_UNIT_TEST_SUITE(TBufferTest) { + Y_UNIT_TEST(Transfer) { + TBuffer buffer("razrazraz", 9); + TBufferInput input(buffer); + + input.Skip(3); + + TStringStream output; + TransferData(&input, &output); + + UNIT_ASSERT_VALUES_EQUAL(output.Str(), "razraz"); + } + + Y_UNIT_TEST(ReadTo) { + TBuffer buffer("1234567890", 10); + TBufferInput input(buffer); + + TString tmp; + UNIT_ASSERT_VALUES_EQUAL(input.ReadTo(tmp, '3'), 3); + UNIT_ASSERT_VALUES_EQUAL(tmp, "12"); + + UNIT_ASSERT_VALUES_EQUAL(input.ReadTo(tmp, 'z'), 7); + UNIT_ASSERT_VALUES_EQUAL(tmp, "4567890"); + } + + Y_UNIT_TEST(WriteViaNextAndUndo) { + TBuffer buffer; + TBufferOutput output(buffer); + TString str; + + for (size_t i = 0; i < 10000; ++i) { + str.push_back('a' + (i % 20)); + } + + size_t written = 0; + void* ptr = nullptr; + while (written < str.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str.size() - written); + memcpy(ptr, str.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + + UNIT_ASSERT(0 == memcmp(buffer.data(), str.begin(), buffer.size())); + } + + Y_UNIT_TEST(Write) { + TBuffer buffer; + TBufferOutput output(buffer); + output << "1" + << "22" + << "333" + << "4444" + << "55555"; + + UNIT_ASSERT(0 == memcmp(buffer.data(), "1" + "22" + "333" + "4444" + "55555", + buffer.size())); + } + + Y_UNIT_TEST(WriteChars) { + TBuffer buffer; + TBufferOutput output(buffer); + output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0'; + + UNIT_ASSERT(0 == memcmp(buffer.data(), "1234567890", buffer.size())); + } +} diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp new file mode 100644 index 0000000000..a00e592e1c --- /dev/null +++ b/util/stream/buffered.cpp @@ -0,0 +1,428 @@ +#include "mem.h" +#include "buffered.h" + +#include <util/memory/addstorage.h> +#include <util/generic/yexception.h> +#include <util/generic/buffer.h> + +class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> { +public: + inline TImpl(IInputStream* slave) + : Slave_(slave) + , MemInput_(nullptr, 0) + { + } + + inline ~TImpl() = default; + + inline size_t Next(const void** ptr, size_t len) { + if (MemInput_.Exhausted()) { + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + + return MemInput_.Next(ptr, len); + } + + inline size_t Read(void* buf, size_t len) { + if (MemInput_.Exhausted()) { + if (len > BufLen() / 2) { + return Slave_->Read(buf, len); + } + + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + + return MemInput_.Read(buf, len); + } + + inline size_t Skip(size_t len) { + size_t totalSkipped = 0; + while (len) { + const size_t skipped = DoSkip(len); + if (skipped == 0) { + break; + } + + totalSkipped += skipped; + len -= skipped; + } + + return totalSkipped; + } + + inline size_t DoSkip(size_t len) { + if (MemInput_.Exhausted()) { + if (len > BufLen() / 2) { + return Slave_->Skip(len); + } + + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + + return MemInput_.Skip(len); + } + + inline size_t ReadTo(TString& st, char to) { + st.clear(); + + TString s_tmp; + + size_t ret = 0; + + while (true) { + if (MemInput_.Exhausted()) { + const size_t bytesRead = Slave_->Read(Buf(), BufLen()); + + if (!bytesRead) { + break; + } + + MemInput_.Reset(Buf(), bytesRead); + } + + const size_t a_len(MemInput_.Avail()); + size_t s_len = 0; + if (st.empty()) { + ret += MemInput_.ReadTo(st, to); + s_len = st.length(); + } else { + ret += MemInput_.ReadTo(s_tmp, to); + s_len = s_tmp.length(); + st.append(s_tmp); + } + + if (s_len != a_len) { + break; + } + } + + return ret; + } + + inline void Reset(IInputStream* slave) { + Slave_ = slave; + } + +private: + inline size_t BufLen() const noexcept { + return AdditionalDataLength(); + } + + inline void* Buf() const noexcept { + return AdditionalData(); + } + +private: + IInputStream* Slave_; + TMemoryInput MemInput_; +}; + +TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen) + : Impl_(new (buflen) TImpl(slave)) +{ +} + +TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default; +TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default; + +TBufferedInput::~TBufferedInput() = default; + +size_t TBufferedInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + +size_t TBufferedInput::DoSkip(size_t len) { + return Impl_->Skip(len); +} + +size_t TBufferedInput::DoNext(const void** ptr, size_t len) { + return Impl_->Next(ptr, len); +} + +size_t TBufferedInput::DoReadTo(TString& st, char ch) { + return Impl_->ReadTo(st, ch); +} + +void TBufferedInput::Reset(IInputStream* slave) { + Impl_->Reset(slave); +} + +class TBufferedOutputBase::TImpl { +public: + inline TImpl(IOutputStream* slave) + : Slave_(slave) + , MemOut_(nullptr, 0) + , PropagateFlush_(false) + , PropagateFinish_(false) + { + } + + virtual ~TImpl() = default; + + inline void Reset() { + MemOut_.Reset(Buf(), Len()); + } + + inline size_t Next(void** ptr) { + if (MemOut_.Avail() == 0) { + Slave_->Write(Buf(), Stored()); + OnBufferExhausted(); + Reset(); + } + + return MemOut_.Next(ptr); + } + + inline void Undo(size_t len) { + Y_VERIFY(len <= Stored(), "trying to undo more bytes than actually written"); + MemOut_.Undo(len); + } + + inline void Write(const void* buf, size_t len) { + if (len <= MemOut_.Avail()) { + /* + * fast path + */ + + MemOut_.Write(buf, len); + } else { + const size_t stored = Stored(); + const size_t full_len = stored + len; + const size_t good_len = DownToBufferGranularity(full_len); + const size_t write_from_buf = good_len - stored; + + using TPart = IOutputStream::TPart; + + alignas(TPart) char data[2 * sizeof(TPart)]; + TPart* parts = reinterpret_cast<TPart*>(data); + TPart* end = parts; + + if (stored) { + new (end++) TPart(Buf(), stored); + } + + if (write_from_buf) { + new (end++) TPart(buf, write_from_buf); + } + + Slave_->Write(parts, end - parts); + + //grow buffer only on full flushes + OnBufferExhausted(); + Reset(); + + if (write_from_buf < len) { + MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf); + } + } + } + + inline void Write(char c) { + if (Y_UNLIKELY(MemOut_.Avail() == 0)) { + Slave_->Write(Buf(), Stored()); + OnBufferExhausted(); + Reset(); + } + + MemOut_.Write(c); + } + + inline void SetFlushPropagateMode(bool mode) noexcept { + PropagateFlush_ = mode; + } + + inline void SetFinishPropagateMode(bool mode) noexcept { + PropagateFinish_ = mode; + } + + inline void Flush() { + { + Slave_->Write(Buf(), Stored()); + Reset(); + } + + if (PropagateFlush_) { + Slave_->Flush(); + } + } + + inline void Finish() { + try { + Flush(); + } catch (...) { + try { + DoFinish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } + + throw; + } + + DoFinish(); + } + +private: + inline void DoFinish() { + if (PropagateFinish_) { + Slave_->Finish(); + } + } + + inline size_t Stored() const noexcept { + return Len() - MemOut_.Avail(); + } + + inline size_t DownToBufferGranularity(size_t l) const noexcept { + return l - (l % Len()); + } + + virtual void OnBufferExhausted() = 0; + virtual void* Buf() const noexcept = 0; + virtual size_t Len() const noexcept = 0; + +private: + IOutputStream* Slave_; + TMemoryOutput MemOut_; + bool PropagateFlush_; + bool PropagateFinish_; +}; + +namespace { + struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> { + inline TSimpleImpl(IOutputStream* slave) + : TBufferedOutputBase::TImpl(slave) + { + Reset(); + } + + ~TSimpleImpl() override = default; + + void OnBufferExhausted() final { + } + + void* Buf() const noexcept override { + return AdditionalData(); + } + + size_t Len() const noexcept override { + return AdditionalDataLength(); + } + }; + + struct TAdaptiveImpl: public TBufferedOutputBase::TImpl { + enum { + Step = 4096 + }; + + inline TAdaptiveImpl(IOutputStream* slave) + : TBufferedOutputBase::TImpl(slave) + , N_(0) + { + B_.Reserve(Step); + Reset(); + } + + ~TAdaptiveImpl() override = default; + + void OnBufferExhausted() final { + const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10); + + if (c > B_.Capacity()) { + TBuffer(c).Swap(B_); + } + } + + void* Buf() const noexcept override { + return (void*)B_.Data(); + } + + size_t Len() const noexcept override { + return B_.Capacity(); + } + + TBuffer B_; + ui64 N_; + }; +} + +TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave) + : Impl_(new TAdaptiveImpl(slave)) +{ +} + +TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen) + : Impl_(new (buflen) TSimpleImpl(slave)) +{ +} + +TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default; +TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default; + +TBufferedOutputBase::~TBufferedOutputBase() { + try { + Finish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + +size_t TBufferedOutputBase::DoNext(void** ptr) { + Y_ENSURE(Impl_.Get(), "cannot call next in finished stream"); + return Impl_->Next(ptr); +} + +void TBufferedOutputBase::DoUndo(size_t len) { + Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream"); + Impl_->Undo(len); +} + +void TBufferedOutputBase::DoWrite(const void* data, size_t len) { + Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); + Impl_->Write(data, len); +} + +void TBufferedOutputBase::DoWriteC(char c) { + Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); + Impl_->Write(c); +} + +void TBufferedOutputBase::DoFlush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TBufferedOutputBase::DoFinish() { + THolder<TImpl> impl(Impl_.Release()); + + if (impl) { + impl->Finish(); + } +} + +void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept { + if (Impl_.Get()) { + Impl_->SetFlushPropagateMode(propagate); + } +} + +void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept { + if (Impl_.Get()) { + Impl_->SetFinishPropagateMode(propagate); + } +} + +TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen) + : TBufferedOutputBase(slave, buflen) +{ +} + +TBufferedOutput::~TBufferedOutput() = default; + +TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave) + : TBufferedOutputBase(slave) +{ +} + +TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default; diff --git a/util/stream/buffered.h b/util/stream/buffered.h new file mode 100644 index 0000000000..0847186141 --- /dev/null +++ b/util/stream/buffered.h @@ -0,0 +1,235 @@ +#pragma once + +#include "zerocopy.h" +#include "zerocopy_output.h" + +#include <utility> +#include <util/generic/ptr.h> +#include <util/generic/typetraits.h> +#include <util/generic/store_policy.h> + +/** + * @addtogroup Streams_Buffered + * @{ + */ + +/** + * Input stream that wraps the given stream and adds a buffer on top of it, + * thus making sure that data is read from the underlying stream in big chunks. + * + * Note that it does not claim ownership of the underlying stream, so it's up + * to the user to free it. + */ +class TBufferedInput: public IZeroCopyInput { +public: + TBufferedInput(IInputStream* slave, size_t buflen = 8192); + + TBufferedInput(TBufferedInput&&) noexcept; + TBufferedInput& operator=(TBufferedInput&&) noexcept; + + ~TBufferedInput() override; + + /** + * Switches the underlying stream to the one provided. Does not clear the + * data that was already buffered. + * + * @param slave New underlying stream. + */ + void Reset(IInputStream* slave); + +protected: + size_t DoRead(void* buf, size_t len) override; + size_t DoReadTo(TString& st, char ch) override; + size_t DoSkip(size_t len) override; + size_t DoNext(const void** ptr, size_t len) override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/** + * Output stream that wraps the given stream and adds a buffer on top of it, + * thus making sure that data is written to the underlying stream in big chunks. + * + * Note that by default this stream does not propagate `Flush` and `Finish` + * calls to the underlying stream, instead simply flushing out the buffer. + * You can change this behavior by using propagation mode setters. + * + * Also note that this stream does not claim ownership of the underlying stream, + * so it's up to the user to free it. + */ +class TBufferedOutputBase: public IZeroCopyOutput { +public: + /** + * Constructs a buffered stream that dynamically adjusts the size of the + * buffer. This works best when the amount of data that will be passed + * through this stream is not known and can range in size from several + * kilobytes to several gigabytes. + * + * @param slave Underlying stream. + */ + TBufferedOutputBase(IOutputStream* slave); + + /** + * Constructs a buffered stream with the given size of the buffer. + * + * @param slave Underlying stream. + * @param buflen Size of the buffer. + */ + TBufferedOutputBase(IOutputStream* slave, size_t buflen); + + TBufferedOutputBase(TBufferedOutputBase&&) noexcept; + TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept; + + ~TBufferedOutputBase() override; + + /** + * @param propagate Whether `Flush` and `Finish` calls should + * be propagated to the underlying stream. + * By default they are not. + */ + inline void SetPropagateMode(bool propagate) noexcept { + SetFlushPropagateMode(propagate); + SetFinishPropagateMode(propagate); + } + + /** + * @param propagate Whether `Flush` calls should be propagated + * to the underlying stream. By default they + * are not. + */ + void SetFlushPropagateMode(bool propagate) noexcept; + + /** + * @param propagate Whether `Finish` calls should be propagated + * to the underlying stream. By default they + * are not. + */ + void SetFinishPropagateMode(bool propagate) noexcept; + + class TImpl; + +protected: + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; + void DoWrite(const void* data, size_t len) override; + void DoWriteC(char c) override; + void DoFlush() override; + void DoFinish() override; + +private: + THolder<TImpl> Impl_; +}; + +/** + * Buffered output stream with a fixed-size buffer. + * + * @see TBufferedOutputBase + */ +class TBufferedOutput: public TBufferedOutputBase { +public: + TBufferedOutput(IOutputStream* slave, size_t buflen = 8192); + ~TBufferedOutput() override; + + TBufferedOutput(TBufferedOutput&&) noexcept = default; + TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default; +}; + +/** + * Buffered output stream that dynamically adjusts the size of the buffer based + * on the amount of data that's passed through it. + * + * @see TBufferedOutputBase + */ +class TAdaptiveBufferedOutput: public TBufferedOutputBase { +public: + TAdaptiveBufferedOutput(IOutputStream* slave); + ~TAdaptiveBufferedOutput() override; + + TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default; + TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default; +}; + +namespace NPrivate { + struct TMyBufferedOutput: public TBufferedOutput { + inline TMyBufferedOutput(IOutputStream* slave, size_t buflen) + : TBufferedOutput(slave, buflen) + { + SetFinishPropagateMode(true); + } + }; + + template <class T> + struct TBufferedStreamFor { + using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>; + }; +} + +/** + * A mixin class that turns unbuffered stream into a buffered one. + * + * Note that using this mixin with a stream that is already buffered won't + * result in double buffering, e.g. `TBuffered<TBuffered<TUnbufferedFileInput>>` and + * `TBuffered<TUnbufferedFileInput>` are basically the same types. + * + * Example usage: + * @code + * TBuffered<TUnbufferedFileInput> file_input(1024, "/path/to/file"); + * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file"); + * @endcode + * Here 1024 is the size of the buffer. + */ +template <class TSlave> +class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult { + using TSlaveBase = TEmbedPolicy<TSlave>; + using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult; + +public: + template <typename... Args> + inline TBuffered(size_t b, Args&&... args) + : TSlaveBase(std::forward<Args>(args)...) + , TBufferedBase(TSlaveBase::Ptr(), b) + { + } + + inline TSlave& Slave() noexcept { + return *this->Ptr(); + } + + TBuffered(const TBuffered&) = delete; + TBuffered& operator=(const TBuffered&) = delete; + TBuffered(TBuffered&&) = delete; + TBuffered& operator=(TBuffered&&) = delete; +}; + +/** + * A mixin class that turns unbuffered stream into an adaptively buffered one. + * Created stream differs from the one created via `TBuffered` template in that + * it dynamically adjusts the size of the buffer based on the amount of data + * that's passed through it. + * + * Example usage: + * @code + * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file"); + * @endcode + */ +template <class TSlave> +class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput { + using TSlaveBase = TEmbedPolicy<TSlave>; + +public: + template <typename... Args> + inline TAdaptivelyBuffered(Args&&... args) + : TSlaveBase(std::forward<Args>(args)...) + , TAdaptiveBufferedOutput(TSlaveBase::Ptr()) + { + } + + TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete; + TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete; + TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete; + TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete; +}; + +/** @} */ diff --git a/util/stream/buffered_ut.cpp b/util/stream/buffered_ut.cpp new file mode 100644 index 0000000000..41d2fc3030 --- /dev/null +++ b/util/stream/buffered_ut.cpp @@ -0,0 +1,142 @@ +#include "buffered.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> +#include <util/random/mersenne.h> + +Y_UNIT_TEST_SUITE(TestBufferedIO) { + template <class TOut> + inline void Run(TOut&& out) { + TMersenne<ui64> r; + + for (size_t i = 0; i < 1000; ++i) { + const size_t c = r.GenRand() % 10000; + TString s; + + for (size_t j = 0; j < c; ++j) { + s.append('A' + (r.GenRand() % 10)); + } + + out.Write(s.data(), s.size()); + } + } + + Y_UNIT_TEST(TestEqual) { + TString s1; + TString s2; + + Run(TBuffered<TStringOutput>(8000, s1)); + Run(TAdaptivelyBuffered<TStringOutput>(s2)); + + UNIT_ASSERT_VALUES_EQUAL(s1, s2); + } + + Y_UNIT_TEST(Test1) { + TString s; + + TBuffered<TStringOutput>(100, s).Write("1", 1); + + UNIT_ASSERT_VALUES_EQUAL(s, "1"); + } + + Y_UNIT_TEST(Test2) { + TString s; + + TBuffered<TStringOutput>(1, s).Write("12", 2); + + UNIT_ASSERT_VALUES_EQUAL(s, "12"); + } + + Y_UNIT_TEST(Test3) { + TString s; + + auto&& b = TBuffered<TStringOutput>(1, s); + + b.Write("1", 1); + b.Write("12", 2); + b.Finish(); + + UNIT_ASSERT_VALUES_EQUAL(s, "112"); + } + + Y_UNIT_TEST(Test4) { + TString s; + + auto&& b = TBuffered<TStringOutput>(1, s); + + b.Write('1'); + b.Write('2'); + b.Write('3'); + b.Finish(); + + UNIT_ASSERT_VALUES_EQUAL(s, "123"); + } + + template <class TOut> + inline void DoGenAndWrite(TOut&& output, TString& str) { + TMersenne<ui64> r; + for (size_t i = 0; i < 43210; ++i) { + str.append('A' + (r.GenRand() % 10)); + } + size_t written = 0; + void* ptr = nullptr; + while (written < str.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str.size() - written); + memcpy(ptr, str.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + } + + Y_UNIT_TEST(TestWriteViaNextAndUndo) { + TString str1, str2; + DoGenAndWrite(TBuffered<TStringOutput>(5000, str1), str2); + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + + Y_UNIT_TEST(TestWriteViaNextAndUndoAdaptive) { + TString str1, str2; + DoGenAndWrite(TAdaptivelyBuffered<TStringOutput>(str1), str2); + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + + Y_UNIT_TEST(TestInput) { + TString s("0123456789abcdefghijklmn"); + TBuffered<TStringInput> in(5, s); + char c; + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //1 + UNIT_ASSERT_VALUES_EQUAL(c, '0'); + UNIT_ASSERT_VALUES_EQUAL(in.Skip(4), 4); //5 end of buffer + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //6 + UNIT_ASSERT_VALUES_EQUAL(c, '5'); + UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //9 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //10 end of buffer + UNIT_ASSERT_VALUES_EQUAL(c, '9'); + UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //13 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //14 start new buffer + UNIT_ASSERT_VALUES_EQUAL(c, 'd'); + UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 6); //20 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //21 start new buffer + UNIT_ASSERT_VALUES_EQUAL(c, 'k'); + UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 3); //24 eof + } + + Y_UNIT_TEST(TestReadTo) { + TString s("0123456789abc"); + TBuffered<TStringInput> in(2, s); + TString t; + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8); + UNIT_ASSERT_VALUES_EQUAL(t, "0123456"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '8'), 1); + UNIT_ASSERT_VALUES_EQUAL(t, ""); + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 4); + UNIT_ASSERT_VALUES_EQUAL(t, "9abc"); + } +} diff --git a/util/stream/debug.cpp b/util/stream/debug.cpp new file mode 100644 index 0000000000..afd5b3e1c7 --- /dev/null +++ b/util/stream/debug.cpp @@ -0,0 +1,50 @@ +#include "null.h" +#include "debug.h" + +#include <util/string/cast.h> +#include <util/generic/singleton.h> +#include <util/generic/yexception.h> + +#include <cstdio> +#include <cstdlib> + +void TDebugOutput::DoWrite(const void* buf, size_t len) { + if (len != fwrite(buf, 1, len, stderr)) { + ythrow yexception() << "write failed(" << LastSystemErrorText() << ")"; + } +} + +namespace { + struct TDbgSelector { + inline TDbgSelector() { + char* dbg = getenv("DBGOUT"); + if (dbg) { + Out = &Cerr; + try { + Level = FromString(dbg); + } catch (const yexception&) { + Level = 0; + } + } else { + Out = &Cnull; + Level = 0; + } + } + + IOutputStream* Out; + int Level; + }; +} + +template <> +struct TSingletonTraits<TDbgSelector> { + static constexpr size_t Priority = 8; +}; + +IOutputStream& StdDbgStream() noexcept { + return *(Singleton<TDbgSelector>()->Out); +} + +int StdDbgLevel() noexcept { + return Singleton<TDbgSelector>()->Level; +} diff --git a/util/stream/debug.h b/util/stream/debug.h new file mode 100644 index 0000000000..92d6d4b42d --- /dev/null +++ b/util/stream/debug.h @@ -0,0 +1,53 @@ +#pragma once + +#include "output.h" + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Debug output stream. Writes into `stderr`. + */ +class TDebugOutput: public IOutputStream { +public: + inline TDebugOutput() noexcept = default; + ~TDebugOutput() override = default; + + TDebugOutput(TDebugOutput&&) noexcept = default; + TDebugOutput& operator=(TDebugOutput&&) noexcept = default; + +private: + void DoWrite(const void* buf, size_t len) override; +}; + +/** + * @returns Standard debug stream. + * @see Cdbg + */ +IOutputStream& StdDbgStream() noexcept; + +/** + * This function returns the current debug level as set via `DBGOUT` environment + * variable. + * + * Note that the proper way to use this function is via `Y_DBGTRACE` macro. + * There are very few cases when there is a need to use it directly. + * + * @returns Debug level. + * @see ETraceLevel + * @see DBGTRACE + */ +int StdDbgLevel() noexcept; + +/** + * Standard debug stream. + * + * Behavior of this stream is controlled via `DBGOUT` environment variable. + * If this variable is set, then this stream is redirected into `stderr`, + * otherwise whatever is written into it is simply ignored. + */ +#define Cdbg (StdDbgStream()) + +/** @} */ diff --git a/util/stream/direct_io.cpp b/util/stream/direct_io.cpp new file mode 100644 index 0000000000..649033af34 --- /dev/null +++ b/util/stream/direct_io.cpp @@ -0,0 +1,47 @@ +#include "direct_io.h" + +#include <util/generic/utility.h> + +size_t TRandomAccessFileInput::DoRead(void* buf, size_t len) { + const size_t result = File.Pread(buf, len, Position); + Position += result; + return result; +} + +TRandomAccessFileInput::TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position) + : File(file) + , Position(position) +{ +} + +size_t TRandomAccessFileInput::DoSkip(size_t len) { + size_t skiped = Min(len, (size_t)Min((ui64)Max<size_t>(), File.GetLength() - Position)); + Position += skiped; + return skiped; +} + +TRandomAccessFileOutput::TRandomAccessFileOutput(TDirectIOBufferedFile& file) + : File(&file) +{ +} + +void TRandomAccessFileOutput::DoWrite(const void* buf, size_t len) { + File->Write(buf, len); +} + +void TRandomAccessFileOutput::DoFlush() { + File->FlushData(); +} + +TBufferedFileOutputEx::TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen) + : TRandomAccessFileOutput(*(new TDirectIOBufferedFile(path, oMode, buflen))) + , FileHolder(File) +{ +} + +void TBufferedFileOutputEx::DoFinish() { + FileHolder->Finish(); +} + +void TBufferedFileOutputEx::DoFlush() { +} diff --git a/util/stream/direct_io.h b/util/stream/direct_io.h new file mode 100644 index 0000000000..2e1f2e07dd --- /dev/null +++ b/util/stream/direct_io.h @@ -0,0 +1,43 @@ +#pragma once + +#include "input.h" +#include "output.h" +#include <util/system/direct_io.h> + +class TRandomAccessFileInput: public IInputStream { +public: + TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position); + +protected: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + TDirectIOBufferedFile& File; + ui64 Position; +}; + +class TRandomAccessFileOutput: public IOutputStream { +public: + TRandomAccessFileOutput(TDirectIOBufferedFile& file); + + TRandomAccessFileOutput(TRandomAccessFileOutput&&) noexcept = default; + TRandomAccessFileOutput& operator=(TRandomAccessFileOutput&&) noexcept = default; + +protected: + TDirectIOBufferedFile* File; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; +}; + +class TBufferedFileOutputEx: public TRandomAccessFileOutput { +public: + TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen = 1 << 17); + +private: + void DoFlush() override; + void DoFinish() override; + THolder<TDirectIOBufferedFile> FileHolder; +}; diff --git a/util/stream/direct_io_ut.cpp b/util/stream/direct_io_ut.cpp new file mode 100644 index 0000000000..01d09db232 --- /dev/null +++ b/util/stream/direct_io_ut.cpp @@ -0,0 +1,71 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> +#include <util/generic/array_size.h> +#include <util/system/env.h> + +#include "buffered.h" +#include "direct_io.h" + +Y_UNIT_TEST_SUITE(TDirectIOTests) { + // Decrease numBufToWrite further if tests continue to time out + static void Y_NO_INLINE Test(EOpenMode mode, size_t numBufToWrite) { + const char TEMPLATE[] = "qwertyuiopQWERTYUIOPasdfghjklASD"; + const auto TEMPLATE_SIZE = Y_ARRAY_SIZE(TEMPLATE) - 1; + static_assert(TEMPLATE_SIZE > 0, "must be greater than zero"); + + const size_t BUFFER_SIZE = 32 * 1024; + static_assert(0 == BUFFER_SIZE % TEMPLATE_SIZE, "must be divisible"); + + const size_t CHUNK_SIZE_TO_READ = 512; + static_assert(0 == CHUNK_SIZE_TO_READ % TEMPLATE_SIZE, "must be divisible"); + + // filling buffer + // TEMPLATE|TEMPLATE|TEMPLATE|... + auto&& buffer = TBuffer{BUFFER_SIZE}; + for (size_t i = 0; i < BUFFER_SIZE / TEMPLATE_SIZE; ++i) { + buffer.Append(TEMPLATE, TEMPLATE_SIZE); + } + + // filling file + // TEMPLATE|TEMPLATE|TEMPLATE|... + const auto fileName = TString("test.file"); + auto&& directIOBuffer = TDirectIOBufferedFile{fileName, RdWr | CreateAlways | mode}; + { + auto&& output = TRandomAccessFileOutput{directIOBuffer}; + for (size_t i = 0; i < numBufToWrite; ++i) { + output.Write(buffer.Data(), BUFFER_SIZE); + } + } + + auto&& reader = TRandomAccessFileInput{directIOBuffer, 0}; + auto&& input = TBufferedInput{&reader, 1 << 17}; + auto bytesRead = size_t{}; + while (auto len = input.Read(buffer.Data(), CHUNK_SIZE_TO_READ)) { + bytesRead += len; + while (len) { + if (len < TEMPLATE_SIZE) { + UNIT_ASSERT(!memcmp(buffer.Data(), TEMPLATE, len)); + len = 0; + } else { + UNIT_ASSERT(!memcmp(buffer.Data(), TEMPLATE, TEMPLATE_SIZE)); + len -= TEMPLATE_SIZE; + } + } + } + + UNIT_ASSERT_VALUES_EQUAL(bytesRead, numBufToWrite * BUFFER_SIZE); + } + + Y_UNIT_TEST(ReadWriteTest) { + Test(0, 100 * 32); + } + + Y_UNIT_TEST(ReadWriteDirectTest) { + Test(Direct, 100 * 4); + } + + Y_UNIT_TEST(ReadWriteDirectSeqTest) { + Test(Direct | Seq, 100 * 4); + } +} diff --git a/util/stream/file.cpp b/util/stream/file.cpp new file mode 100644 index 0000000000..dc5d2f6311 --- /dev/null +++ b/util/stream/file.cpp @@ -0,0 +1,97 @@ +#include "file.h" + +#include <util/memory/blob.h> +#include <util/generic/yexception.h> + +TUnbufferedFileInput::TUnbufferedFileInput(const TString& path) + : File_(path, OpenExisting | RdOnly | Seq) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "file " << path << " not open"; + } +} + +TUnbufferedFileInput::TUnbufferedFileInput(const TFile& file) + : File_(file) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "file (" << file.GetName() << ") not open"; + } +} + +size_t TUnbufferedFileInput::DoRead(void* buf, size_t len) { + return File_.ReadOrFail(buf, len); +} + +size_t TUnbufferedFileInput::DoSkip(size_t len) { + if (len < 384) { + /* Base implementation calls DoRead, which results in one system call + * instead of three as in fair skip implementation. For small sizes + * actually doing one read is cheaper. Experiments show that the + * border that separates two implementations performance-wise lies + * in the range of 384-512 bytes (assuming that the file is in OS cache). */ + return IInputStream::DoSkip(len); + } + + /* TFile::Seek can seek beyond the end of file, so we need to do + * size check here. */ + i64 size = File_.GetLength(); + i64 oldPos = File_.GetPosition(); + i64 newPos = File_.Seek(Min<i64>(size, oldPos + len), sSet); + + return newPos - oldPos; +} + +TUnbufferedFileOutput::TUnbufferedFileOutput(const TString& path) + : File_(path, CreateAlways | WrOnly | Seq) +{ + if (!File_.IsOpen()) { + ythrow TFileError() << "can not open " << path; + } +} + +TUnbufferedFileOutput::TUnbufferedFileOutput(const TFile& file) + : File_(file) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "closed file(" << file.GetName() << ") passed"; + } +} + +TUnbufferedFileOutput::~TUnbufferedFileOutput() = default; + +void TUnbufferedFileOutput::DoWrite(const void* buf, size_t len) { + File_.Write(buf, len); +} + +void TUnbufferedFileOutput::DoFlush() { + if (File_.IsOpen()) { + File_.Flush(); + } +} + +class TMappedFileInput::TImpl: public TBlob { +public: + inline TImpl(TFile file) + : TBlob(TBlob::FromFile(file)) + { + } + + inline ~TImpl() = default; +}; + +TMappedFileInput::TMappedFileInput(const TFile& file) + : TMemoryInput(nullptr, 0) + , Impl_(new TImpl(file)) +{ + Reset(Impl_->Data(), Impl_->Size()); +} + +TMappedFileInput::TMappedFileInput(const TString& path) + : TMemoryInput(nullptr, 0) + , Impl_(new TImpl(TFile(path, OpenExisting | RdOnly))) +{ + Reset(Impl_->Data(), Impl_->Size()); +} + +TMappedFileInput::~TMappedFileInput() = default; diff --git a/util/stream/file.h b/util/stream/file.h new file mode 100644 index 0000000000..c1cf4f591d --- /dev/null +++ b/util/stream/file.h @@ -0,0 +1,108 @@ +#pragma once + +#include "fwd.h" +#include "input.h" +#include "output.h" +#include "buffered.h" +#include "mem.h" + +#include <util/system/file.h> +#include <utility> + +/** + * @addtogroup Streams_Files + * @{ + */ + +/** + * Unbuffered file input stream. + * + * Note that the input is not buffered, which means that `ReadLine` calls will + * be _very_ slow. + */ +class TUnbufferedFileInput: public IInputStream { +public: + TUnbufferedFileInput(const TFile& file); + TUnbufferedFileInput(const TString& path); + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + TFile File_; +}; + +/** + * Memory-mapped file input stream. + */ +class TMappedFileInput: public TMemoryInput { +public: + TMappedFileInput(const TFile& file); + TMappedFileInput(const TString& path); + ~TMappedFileInput() override; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +/** + * File output stream. + * + * Note that the output is unbuffered, thus writing in many small chunks is + * likely to be quite slow. + */ +class TUnbufferedFileOutput: public IOutputStream { +public: + TUnbufferedFileOutput(const TString& path); + TUnbufferedFileOutput(const TFile& file); + ~TUnbufferedFileOutput() override; + + TUnbufferedFileOutput(TUnbufferedFileOutput&&) noexcept = default; + TUnbufferedFileOutput& operator=(TUnbufferedFileOutput&&) noexcept = default; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + +private: + TFile File_; +}; + +/** + * Buffered file input stream. + * + * @see TBuffered + */ +class TFileInput: public TBuffered<TUnbufferedFileInput> { +public: + template <class T> + inline TFileInput(T&& t, size_t buf = 1 << 13) + : TBuffered<TUnbufferedFileInput>(buf, std::forward<T>(t)) + { + } + + ~TFileInput() override = default; +}; + +/** + * Buffered file output stream. + * + * Currently deprecated, please use TFileOutput in new code. + * + * @deprecated + * @see TBuffered + */ +class TFixedBufferFileOutput: public TBuffered<TUnbufferedFileOutput> { +public: + template <class T> + inline TFixedBufferFileOutput(T&& t, size_t buf = 1 << 13) + : TBuffered<TUnbufferedFileOutput>(buf, std::forward<T>(t)) + { + } + + ~TFixedBufferFileOutput() override = default; +}; + +/** @} */ diff --git a/util/stream/file_ut.cpp b/util/stream/file_ut.cpp new file mode 100644 index 0000000000..ac0f09796e --- /dev/null +++ b/util/stream/file_ut.cpp @@ -0,0 +1,74 @@ +#include "file.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/tempfile.h> + +static const char* TmpFileName = "./fileio"; +static const char* TmpFileContents = "To do good to Mankind is the chivalrous plan"; +static const char* TmpFileSubstring = strstr(TmpFileContents, "chivalrous"); + +Y_UNIT_TEST_SUITE(TFileTest) { + Y_UNIT_TEST(InputTest) { + TTempFile tmp(TmpFileName); + + { + TUnbufferedFileOutput output(TmpFileName); + output.Write(TmpFileContents, strlen(TmpFileContents)); + } + + { + TUnbufferedFileInput input(TmpFileName); + TString s = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL(s, TmpFileContents); + } + + { + TUnbufferedFileInput input(TmpFileName); + input.Skip(TmpFileSubstring - TmpFileContents); + TString s = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL(s, "chivalrous plan"); + } + + { + TUnbufferedFileOutput output(TFile::ForAppend(TmpFileName)); + output.Write(TmpFileContents, strlen(TmpFileContents)); + } + + { + TUnbufferedFileInput input(TmpFileName); + TString s = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL(s, TString::Join(TmpFileContents, TmpFileContents)); + } + } + + Y_UNIT_TEST(EmptyMapTest) { + TTempFile tmp(TmpFileName); + + { + TUnbufferedFileOutput output(TmpFileName); + /* Write nothing. */ + } + + { + TMappedFileInput input(TmpFileName); + TString s = input.ReadAll(); + UNIT_ASSERT(s.empty()); + } + } + +#ifdef _unix_ + Y_UNIT_TEST(PipeReadLineTest) { + int fds[2]; + UNIT_ASSERT(pipe(fds) == 0); + TFile readEnd(fds[0]); + TFileInput fileInput(readEnd); + UNIT_ASSERT_VALUES_EQUAL(write(fds[1], "hello\n", 6), 6); + + TString line; + UNIT_ASSERT(fileInput.ReadLine(line)); + UNIT_ASSERT_STRINGS_EQUAL(line, "hello"); + close(fds[1]); + } +#endif +} diff --git a/util/stream/format.cpp b/util/stream/format.cpp new file mode 100644 index 0000000000..3996130df5 --- /dev/null +++ b/util/stream/format.cpp @@ -0,0 +1,134 @@ +#include "format.h" +#include "output.h" + +#include <util/generic/ymath.h> +#include <util/string/cast.h> + +namespace NFormatPrivate { + static inline i64 Round(double value) { + double res1 = floor(value); + double res2 = ceil(value); + return (value - res1 < res2 - value) ? (i64)res1 : (i64)res2; + } + + static inline IOutputStream& PrintDoubleShortly(IOutputStream& os, const double& d) { + // General case: request 3 significant digits + // Side-effect: allows exponential representation + EFloatToStringMode mode = PREC_NDIGITS; + int ndigits = 3; + + if (IsValidFloat(d) && Abs(d) < 1e12) { + // For reasonably-sized finite values, it's better to avoid + // exponential representation. + // Use compact fixed representation and determine + // precision based on magnitude. + mode = PREC_POINT_DIGITS_STRIP_ZEROES; + if (i64(Abs(d) * 100) < 1000) { + ndigits = 2; + } else if (i64(Abs(d) * 10) < 1000) { + ndigits = 1; + } else { + ndigits = 0; + } + } + + return os << Prec(d, mode, ndigits); + } +} + +template <> +void Out<NFormatPrivate::THumanReadableSize>(IOutputStream& stream, const NFormatPrivate::THumanReadableSize& value) { + ui64 base = value.Format == SF_BYTES ? 1024 : 1000; + ui64 base2 = base * base; + ui64 base3 = base * base2; + ui64 base4 = base * base3; + + double v = value.Value; + if (v < 0) { + stream << "-"; + v = -v; + } + + if (v < base) { + NFormatPrivate::PrintDoubleShortly(stream, v); + } else if (v < base2) { + NFormatPrivate::PrintDoubleShortly(stream, v / (double)base) << 'K'; + } else if (v < base3) { + NFormatPrivate::PrintDoubleShortly(stream, v / (double)base2) << 'M'; + } else if (v < base4) { + NFormatPrivate::PrintDoubleShortly(stream, v / (double)base3) << 'G'; + } else { + NFormatPrivate::PrintDoubleShortly(stream, v / (double)base4) << 'T'; + } + + if (value.Format == SF_BYTES) { + if (v < base) { + stream << "B"; + } else { + stream << "iB"; + } + } +} + +template <> +void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NFormatPrivate::THumanReadableDuration& hr) { + TTempBuf buf; + TMemoryOutput ss(buf.Data(), buf.Size()); + + do { + ui64 microSeconds = hr.Value.MicroSeconds(); + if (microSeconds < 1000) { + ss << microSeconds << "us"; + break; + } + if (microSeconds < 1000 * 1000) { + NFormatPrivate::PrintDoubleShortly(ss, (double)microSeconds / 1000.0) << "ms"; + break; + } + + double seconds = (double)(hr.Value.MilliSeconds()) / 1000.0; + if (seconds < 60) { + NFormatPrivate::PrintDoubleShortly(ss, seconds) << 's'; + break; + } + + ui64 s = NFormatPrivate::Round(seconds * 1000 + 0.5) / 1000; + + ui64 m = s / 60; + s = s % 60; + + ui64 h = m / 60; + m = m % 60; + + ui64 d = h / 24; + h = h % 24; + + ui64 times[] = {d, h, m, s}; + char names[] = {'d', 'h', 'm', 's'}; + bool first = true; + + for (size_t i = 0; i < Y_ARRAY_SIZE(times); ++i) { + if (times[i] > 0) { + if (!first) { + ss << ' '; + } + ss << times[i] << names[i]; + first = false; + } + } + } while (false); + + size_t written = buf.Size() - ss.Avail(); + os.Write(buf.Data(), written); +} + +void Time(IOutputStream& l) { + l << millisec(); +} + +void TimeHumanReadable(IOutputStream& l) { + char timeStr[30]; + const time_t t = time(nullptr); + + l << ctime_r(&t, timeStr); +} diff --git a/util/stream/format.h b/util/stream/format.h new file mode 100644 index 0000000000..b033208a1b --- /dev/null +++ b/util/stream/format.h @@ -0,0 +1,444 @@ +#pragma once + +#include "mem.h" +#include "output.h" + +#include <util/datetime/base.h> +#include <util/generic/strbuf.h> +#include <util/generic/flags.h> +#include <util/memory/tempbuf.h> +#include <util/string/cast.h> + +enum ENumberFormatFlag { + HF_FULL = 0x01, /**< Output number with leading zeros. */ + HF_ADDX = 0x02, /**< Output '0x' or '0b' before hex/bin digits. */ +}; +Y_DECLARE_FLAGS(ENumberFormat, ENumberFormatFlag) +Y_DECLARE_OPERATORS_FOR_FLAGS(ENumberFormat) + +enum ESizeFormat { + SF_QUANTITY, /**< Base 1000, usual suffixes. 1100 gets turned into "1.1K". */ + SF_BYTES, /**< Base 1024, byte suffix. 1100 gets turned into "1.07KiB". */ +}; + +namespace NFormatPrivate { + template <size_t Value> + struct TLog2: std::integral_constant<size_t, TLog2<Value / 2>::value + 1> {}; + + template <> + struct TLog2<1>: std::integral_constant<size_t, 0> {}; + + static inline void WriteChars(IOutputStream& os, char c, size_t count) { + if (count == 0) + return; + TTempBuf buf(count); + memset(buf.Data(), c, count); + os.Write(buf.Data(), count); + } + + template <typename T> + struct TLeftPad { + T Value; + size_t Width; + char Padc; + + inline TLeftPad(const T& value, size_t width, char padc) + : Value(value) + , Width(width) + , Padc(padc) + { + } + }; + + template <typename T> + IOutputStream& operator<<(IOutputStream& o, const TLeftPad<T>& lp) { + TTempBuf buf; + TMemoryOutput ss(buf.Data(), buf.Size()); + ss << lp.Value; + size_t written = buf.Size() - ss.Avail(); + if (lp.Width > written) { + WriteChars(o, lp.Padc, lp.Width - written); + } + o.Write(buf.Data(), written); + return o; + } + + template <typename T> + struct TRightPad { + T Value; + size_t Width; + char Padc; + + inline TRightPad(const T& value, size_t width, char padc) + : Value(value) + , Width(width) + , Padc(padc) + { + } + }; + + template <typename T> + IOutputStream& operator<<(IOutputStream& o, const TRightPad<T>& lp) { + TTempBuf buf; + TMemoryOutput ss(buf.Data(), buf.Size()); + ss << lp.Value; + size_t written = buf.Size() - ss.Avail(); + o.Write(buf.Data(), written); + if (lp.Width > written) { + WriteChars(o, lp.Padc, lp.Width - written); + } + return o; + } + + template <typename T, size_t Base> + struct TBaseNumber { + T Value; + ENumberFormat Flags; + + template <typename OtherT> + inline TBaseNumber(OtherT value, ENumberFormat flags) + : Value(value) + , Flags(flags) + { + } + }; + + template <typename T, size_t Base> + using TUnsignedBaseNumber = TBaseNumber<std::make_unsigned_t<std::remove_cv_t<T>>, Base>; + + template <typename T, size_t Base> + IOutputStream& operator<<(IOutputStream& stream, const TBaseNumber<T, Base>& value) { + char buf[8 * sizeof(T) + 1]; /* Add 1 for sign. */ + TStringBuf str(buf, IntToString<Base>(value.Value, buf, sizeof(buf))); + + if (str[0] == '-') { + stream << '-'; + str.Skip(1); + } + + if (value.Flags & HF_ADDX) { + if (Base == 16) { + stream << TStringBuf("0x"); + } else if (Base == 2) { + stream << TStringBuf("0b"); + } + } + + if (value.Flags & HF_FULL) { + WriteChars(stream, '0', (8 * sizeof(T) + TLog2<Base>::value - 1) / TLog2<Base>::value - str.size()); + } + + stream << str; + return stream; + } + + template <typename Char, size_t Base> + struct TBaseText { + TBasicStringBuf<Char> Text; + + inline TBaseText(const TBasicStringBuf<Char> text) + : Text(text) + { + } + }; + + template <typename Char, size_t Base> + IOutputStream& operator<<(IOutputStream& os, const TBaseText<Char, Base>& text) { + for (size_t i = 0; i < text.Text.size(); ++i) { + if (i != 0) { + os << ' '; + } + os << TUnsignedBaseNumber<Char, Base>(text.Text[i], HF_FULL); + } + return os; + } + + template <typename T> + struct TFloatPrecision { + using TdVal = std::remove_cv_t<T>; + static_assert(std::is_floating_point<TdVal>::value, "expect std::is_floating_point<TdVal>::value"); + + TdVal Value; + EFloatToStringMode Mode; + int NDigits; + }; + + template <typename T> + IOutputStream& operator<<(IOutputStream& o, const TFloatPrecision<T>& prec) { + char buf[512]; + size_t count = FloatToString(prec.Value, buf, sizeof(buf), prec.Mode, prec.NDigits); + o << TStringBuf(buf, count); + return o; + } + + struct THumanReadableDuration { + TDuration Value; + + constexpr THumanReadableDuration(const TDuration& value) + : Value(value) + { + } + }; + + struct THumanReadableSize { + double Value; + ESizeFormat Format; + }; +} + +/** + * Output manipulator basically equivalent to `std::setw` and `std::setfill` + * combined. + * + * When written into a `IOutputStream`, writes out padding characters first, + * and then provided value. + * + * Example usage: + * @code + * stream << LeftPad(12345, 10, '0'); // Will output "0000012345" + * @endcode + * + * @param value Value to output. + * @param width Target total width. + * @param padc Character to use for padding. + * @see RightPad + */ +template <typename T> +static constexpr ::NFormatPrivate::TLeftPad<T> LeftPad(const T& value, const size_t width, const char padc = ' ') noexcept { + return ::NFormatPrivate::TLeftPad<T>(value, width, padc); +} + +template <typename T, int N> +static constexpr ::NFormatPrivate::TLeftPad<const T*> LeftPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { + return ::NFormatPrivate::TLeftPad<const T*>(value, width, padc); +} + +/** + * Output manipulator similar to `std::setw` and `std::setfill`. + * + * When written into a `IOutputStream`, writes provided value first, and then + * the padding characters. + * + * Example usage: + * @code + * stream << RightPad("column1", 10, ' '); // Will output "column1 " + * @endcode + * + * @param value Value to output. + * @param width Target total width. + * @param padc Character to use for padding. + * @see LeftPad + */ +template <typename T> +static constexpr ::NFormatPrivate::TRightPad<T> RightPad(const T& value, const size_t width, const char padc = ' ') noexcept { + return ::NFormatPrivate::TRightPad<T>(value, width, padc); +} + +template <typename T, int N> +static constexpr ::NFormatPrivate::TRightPad<const T*> RightPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { + return ::NFormatPrivate::TRightPad<const T*>(value, width, padc); +} + +/** + * Output manipulator similar to `std::setbase(16)`. + * + * When written into a `IOutputStream`, writes out the provided value in + * hexadecimal form. The value is treated as unsigned, even if its type is in + * fact signed. + * + * Example usage: + * @code + * stream << Hex(-1); // Will output "0xFFFFFFFF" + * stream << Hex(1ull); // Will output "0x0000000000000001" + * @endcode + * + * @param value Value to output. + * @param flags Output flags. + */ +template <typename T> +static constexpr ::NFormatPrivate::TUnsignedBaseNumber<T, 16> Hex(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept { + return {value, flags}; +} + +/** + * Output manipulator similar to `std::setbase(16)`. + * + * When written into a `IOutputStream`, writes out the provided value in + * hexadecimal form. + * + * Example usage: + * @code + * stream << SHex(-1); // Will output "-0x00000001" + * stream << SHex(1ull); // Will output "0x0000000000000001" + * @endcode + * + * @param value Value to output. + * @param flags Output flags. + */ +template <typename T> +static constexpr ::NFormatPrivate::TBaseNumber<T, 16> SHex(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept { + return {value, flags}; +} + +/** + * Output manipulator similar to `std::setbase(2)`. + * + * When written into a `IOutputStream`, writes out the provided value in + * binary form. The value is treated as unsigned, even if its type is in + * fact signed. + * + * Example usage: + * @code + * stream << Bin(-1); // Will output "0b11111111111111111111111111111111" + * stream << Bin(1); // Will output "0b00000000000000000000000000000001" + * @endcode + * + * @param value Value to output. + * @param flags Output flags. + */ +template <typename T> +static constexpr ::NFormatPrivate::TUnsignedBaseNumber<T, 2> Bin(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept { + return {value, flags}; +} + +/** + * Output manipulator similar to `std::setbase(2)`. + * + * When written into a `IOutputStream`, writes out the provided value in + * binary form. + * + * Example usage: + * @code + * stream << SBin(-1); // Will output "-0b00000000000000000000000000000001" + * stream << SBin(1); // Will output "0b00000000000000000000000000000001" + * @endcode + * + * @param value Value to output. + * @param flags Output flags. + */ +template <typename T> +static constexpr ::NFormatPrivate::TBaseNumber<T, 2> SBin(const T& value, const ENumberFormat flags = HF_FULL | HF_ADDX) noexcept { + return {value, flags}; +} + +/** + * Output manipulator for hexadecimal string output. + * + * When written into a `IOutputStream`, writes out the provided characters + * in hexadecimal form divided by space character. + * + * Example usage: + * @code + * stream << HexText(TStringBuf("abcи")); // Will output "61 62 63 D0 B8" + * stream << HexText(TWtringBuf(u"abcи")); // Will output "0061 0062 0063 0438" + * @endcode + * + * @param value String to output. + */ +template <typename TChar> +static inline ::NFormatPrivate::TBaseText<TChar, 16> HexText(const TBasicStringBuf<TChar> value) { + return ::NFormatPrivate::TBaseText<TChar, 16>(value); +} + +/** + * Output manipulator for binary string output. + * + * When written into a `IOutputStream`, writes out the provided characters + * in binary form divided by space character. + * + * Example usage: + * @code + * stream << BinText(TStringBuf("aaa")); // Will output "01100001 01100001 01100001" + * @endcode + * + * @param value String to output. + */ +template <typename TChar> +static inline ::NFormatPrivate::TBaseText<TChar, 2> BinText(const TBasicStringBuf<TChar> value) { + return ::NFormatPrivate::TBaseText<TChar, 2>(value); +} + +/** + * Output manipulator for printing `TDuration` values. + * + * When written into a `IOutputStream`, writes out the provided `TDuration` + * in auto-adjusted human-readable format. + * + * Example usage: + * @code + * stream << HumanReadable(TDuration::MicroSeconds(100)); // Will output "100us" + * stream << HumanReadable(TDuration::Seconds(3672)); // Will output "1h 1m 12s" + * @endcode + * + * @param value Value to output. + */ +static constexpr ::NFormatPrivate::THumanReadableDuration HumanReadable(const TDuration duration) noexcept { + return ::NFormatPrivate::THumanReadableDuration(duration); +} + +/** + * Output manipulator for writing out human-readable number of elements / memory + * amount in `ls -h` style. + * + * When written into a `IOutputStream`, writes out the provided unsigned integer + * variable with small precision and a suffix (like 'K', 'M', 'G' for numbers, or + * 'B', 'KiB', 'MiB', 'GiB' for bytes). + * + * For quantities, base 1000 is used. For bytes, base is 1024. + * + * Example usage: + * @code + * stream << HumanReadableSize(1024, SF_QUANTITY); // Will output "1.02K" + * stream << HumanReadableSize(1024, SF_BYTES); // Will output "1KiB" + * stream << "average usage " << HumanReadableSize(100 / 3., SF_BYTES); // Will output "average usage "33.3B"" + * @endcode + * + * @param value Value to output. + * @param format Format to use. + */ +static constexpr ::NFormatPrivate::THumanReadableSize HumanReadableSize(const double size, ESizeFormat format) noexcept { + return {size, format}; +} + +void Time(IOutputStream& l); +void TimeHumanReadable(IOutputStream& l); + +/** + * Output manipulator for adjusting precision of floating point values. + * + * When written into a `IOutputStream`, writes out the provided floating point + * variable with given precision. The behavior depends on provided `mode`. + * + * Example usage: + * @code + * stream << Prec(1.2345678901234567, PREC_AUTO); // Will output "1.2345678901234567" + * @endcode + * + * @param value float or double to output. + * @param mode Output mode. + * @param ndigits Number of significant digits (in `PREC_NDIGITS` and `PREC_POINT_DIGITS` mode). + * @see EFloatToStringMode + */ +template <typename T> +static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const EFloatToStringMode mode, const int ndigits = 0) noexcept { + return {value, mode, ndigits}; +} + +/** + * Output manipulator for adjusting precision of floating point values. + * + * When written into a `IOutputStream`, writes out the provided floating point + * variable with given precision. The behavior is equivalent to `Prec(value, PREC_NDIGITS, ndigits)`. + * + * Example usage: + * @code + * stream << Prec(1.2345678901234567, 3); // Will output "1.23" + * @endcode + * + * @param value float or double to output. + * @param ndigits Number of significant digits. + */ +template <typename T> +static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const int ndigits) noexcept { + return {value, PREC_NDIGITS, ndigits}; +} diff --git a/util/stream/format_ut.cpp b/util/stream/format_ut.cpp new file mode 100644 index 0000000000..43245aeb48 --- /dev/null +++ b/util/stream/format_ut.cpp @@ -0,0 +1,182 @@ +#include "format.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/charset/wide.h> + +Y_UNIT_TEST_SUITE(TOutputStreamFormattingTest) { + Y_UNIT_TEST(TestLeftPad) { + TStringStream ss; + ss << LeftPad(10, 4, '0'); + UNIT_ASSERT_VALUES_EQUAL("0010", ss.Str()); + + ss.Clear(); + ss << LeftPad(222, 1); + UNIT_ASSERT_VALUES_EQUAL("222", ss.Str()); + } + + Y_UNIT_TEST(TestRightPad) { + TStringStream ss; + ss << RightPad("aa", 4); + UNIT_ASSERT_VALUES_EQUAL("aa ", ss.Str()); + + ss.Clear(); + ss << RightPad("aa", 1); + UNIT_ASSERT_VALUES_EQUAL("aa", ss.Str()); + } + + Y_UNIT_TEST(TestTime) { + TStringStream ss; + + ss << "[" << Time << "] " + << "qwqw" << TimeHumanReadable << Endl; + } + + Y_UNIT_TEST(TestHexReference) { + /* + One possible implementation of Hex() stores a reference to the given object. + This can lead to wrong results if the given object is a temporary + which is valid only during constructor call. The following code tries to + demonstrate this. If the implementation stores a reference, + the test fails if compiled with g++44 in debug build + (without optimizations), but performs correctly in release build. + */ + THolder<TStringStream> ss(new TStringStream); + THolder<int> ii(new int(0x1234567)); + (*ss) << Hex(*ii); + UNIT_ASSERT_VALUES_EQUAL("0x01234567", ss->Str()); + } + + Y_UNIT_TEST(TestHexText) { + { + TStringStream ss; + ss << HexText(TStringBuf("abcи")); + UNIT_ASSERT_VALUES_EQUAL("61 62 63 D0 B8", ss.Str()); + } + { + TStringStream ss; + TUtf16String w = UTF8ToWide("abcи"); + ss << HexText<wchar16>(w); + UNIT_ASSERT_VALUES_EQUAL("0061 0062 0063 0438", ss.Str()); + } + } + + Y_UNIT_TEST(TestBin) { + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(2), nullptr)), "10"); + UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2), nullptr)), "-10"); + UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2))), "-0b00000000000000000000000000000010"); + UNIT_ASSERT_VALUES_EQUAL(ToString(SBin(static_cast<i32>(-2), HF_FULL)), "-00000000000000000000000000000010"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(15), nullptr)), "1111"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(1))), "0b00000000000000000000000000000001"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(-1))), "0b11111111111111111111111111111111"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<i32>(-1))), "0b11111111111111111111111111111111"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<i32>(-1), nullptr)), "11111111111111111111111111111111"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui32>(256))), "0b00000000000000000000000100000000"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui8>(16))), "0b00010000"); + UNIT_ASSERT_VALUES_EQUAL(ToString(Bin(static_cast<ui64>(1234587912357ull))), "0b0000000000000000000000010001111101110011001011001000100010100101"); + } + + Y_UNIT_TEST(TestBinText) { + UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("\1"))), "00000001"); + UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("\1\1"))), "00000001 00000001"); + UNIT_ASSERT_VALUES_EQUAL(ToString(BinText(TStringBuf("aaa"))), "01100001 01100001 01100001"); + } + + Y_UNIT_TEST(TestPrec) { + TStringStream ss; + ss << Prec(1.2345678901234567, PREC_AUTO); + UNIT_ASSERT_VALUES_EQUAL("1.2345678901234567", ss.Str()); + + ss.Clear(); + ss << Prec(1.2345678901234567, 3); + UNIT_ASSERT_VALUES_EQUAL("1.23", ss.Str()); + + ss.Clear(); + ss << Prec(1.2345678901234567, PREC_POINT_DIGITS, 3); + UNIT_ASSERT_VALUES_EQUAL("1.235", ss.Str()); + } + + Y_UNIT_TEST(TestHumanReadableSize1000) { + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(0, SF_QUANTITY)), "0"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1, SF_QUANTITY)), "1"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000, SF_QUANTITY)), "1K"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1234567, SF_QUANTITY)), "1.23M"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(12345678, SF_QUANTITY)), "12.3M"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(12345678 * 1000ull, SF_QUANTITY)), "12.3G"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1, SF_QUANTITY)), "-1"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000, SF_QUANTITY)), "-1K"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1234567, SF_QUANTITY)), "-1.23M"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-12345678, SF_QUANTITY)), "-12.3M"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-12345678 * 1000ll, SF_QUANTITY)), "-12.3G"); + } + + Y_UNIT_TEST(TestHumanReadableSize1024) { + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(0, SF_BYTES)), "0B"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(100, SF_BYTES)), "100B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024, SF_BYTES)), "1KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(2.25 * 1024 * 1024, SF_BYTES)), "2.25MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(2.5 * 1024, SF_BYTES)), "2.5KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(45.3 * 1024, SF_BYTES)), "45.3KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024 * 1024, SF_BYTES)), "1MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(5 * 1024 * 1024, SF_BYTES)), "5MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1236 * 1024 * 1024, SF_BYTES)), "1.21GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1024ull * 1024 * 1024 * 1024, SF_BYTES)), "1TiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(100 / 3., SF_BYTES)), "33.3B"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-100, SF_BYTES)), "-100B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024, SF_BYTES)), "-1KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-2.25 * 1024 * 1024, SF_BYTES)), "-2.25MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-2.5 * 1024, SF_BYTES)), "-2.5KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-45.3 * 1024, SF_BYTES)), "-45.3KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024 * 1024, SF_BYTES)), "-1MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-5 * 1024 * 1024, SF_BYTES)), "-5MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1236 * 1024 * 1024, SF_BYTES)), "-1.21GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1024ll * 1024 * 1024 * 1024, SF_BYTES)), "-1TiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-100 / 3., SF_BYTES)), "-33.3B"); + + // XXX: For 1000 <= x < 1024, Prec(x, 3) falls back to exponential form + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll, SF_BYTES)), "1000B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll, SF_BYTES)), "1010B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024, SF_BYTES)), "1000KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024, SF_BYTES)), "1010KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024, SF_BYTES)), "1000MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024, SF_BYTES)), "1010MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024 * 1024, SF_BYTES)), "1000GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024 * 1024, SF_BYTES)), "1010GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1000ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "1000TiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(1010ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "1010TiB"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll, SF_BYTES)), "-1000B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll, SF_BYTES)), "-1010B"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024, SF_BYTES)), "-1000KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024, SF_BYTES)), "-1010KiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024, SF_BYTES)), "-1000MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024, SF_BYTES)), "-1010MiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024 * 1024, SF_BYTES)), "-1000GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024 * 1024, SF_BYTES)), "-1010GiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1000ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "-1000TiB"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadableSize(-1010ll * 1024 * 1024 * 1024 * 1024, SF_BYTES)), "-1010TiB"); + } + + Y_UNIT_TEST(TestHumanReadableDuration) { + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(0))), "0us"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1))), "1us"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(100))), "100us"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1234))), "1.23ms"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(12345))), "12.3ms"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::MicroSeconds(1234567))), "1.23s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(5))), "5s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(59.9))), "59.9s"); + + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(60))), "1m"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(61))), "1m 1s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(72))), "1m 12s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(620))), "10m 20s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(3600))), "1h"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(3672))), "1h 1m 12s"); + UNIT_ASSERT_VALUES_EQUAL(ToString(HumanReadable(TDuration::Seconds(4220))), "1h 10m 20s"); + } +} diff --git a/util/stream/fwd.cpp b/util/stream/fwd.cpp new file mode 100644 index 0000000000..4214b6df83 --- /dev/null +++ b/util/stream/fwd.cpp @@ -0,0 +1 @@ +#include "fwd.h" diff --git a/util/stream/fwd.h b/util/stream/fwd.h new file mode 100644 index 0000000000..307676c6a7 --- /dev/null +++ b/util/stream/fwd.h @@ -0,0 +1,100 @@ +#pragma once + +#include <util/system/types.h> + +class IInputStream; +class IOutputStream; + +class IZeroCopyInput; +class IZeroCopyInputFastReadTo; +class IZeroCopyOutput; + +using TStreamManipulator = void (*)(IOutputStream&); + +class TLengthLimitedInput; +class TCountingInput; +class TCountingOutput; + +class TMemoryInput; +class TMemoryOutput; +class TMemoryWriteBuffer; + +class TMultiInput; + +class TNullInput; +class TNullOutput; +class TNullIO; + +class TPipeBase; +class TPipeInput; +class TPipeOutput; +class TPipedBase; +class TPipedInput; +class TPipedOutput; + +class TStringInput; +class TStringOutput; +class TStringStream; + +class TTeeOutput; + +class TTempBufOutput; + +struct TEol; + +template <typename TEndOfToken> +class TStreamTokenizer; + +enum ETraceLevel: ui8; + +class IWalkInput; + +struct TZLibError; +struct TZLibCompressorError; +struct TZLibDecompressorError; + +namespace ZLib { + enum StreamType: ui8; +} + +class TZLibDecompress; +class TZLibCompress; +class TBufferedZLibDecompress; + +using TZDecompress = TBufferedZLibDecompress; + +class TAlignedInput; +class TAlignedOutput; + +class TBufferInput; +class TBufferOutput; +class TBufferStream; + +class TBufferedInput; +class TBufferedOutputBase; +class TBufferedOutput; +class TAdaptiveBufferedOutput; + +template <class TSlave> +class TBuffered; + +template <class TSlave> +class TAdaptivelyBuffered; + +class TDebugOutput; + +class TRandomAccessFileInput; +class TRandomAccessFileOutput; +class TBufferedFileOutputEx; + +class TUnbufferedFileInput; +class TMappedFileInput; +class TUnbufferedFileOutput; + +class TFileInput; +using TIFStream = TFileInput; + +class TFixedBufferFileOutput; +using TOFStream = TFixedBufferFileOutput; + +using TFileOutput = TAdaptivelyBuffered<TUnbufferedFileOutput>; diff --git a/util/stream/hex.cpp b/util/stream/hex.cpp new file mode 100644 index 0000000000..1c05330504 --- /dev/null +++ b/util/stream/hex.cpp @@ -0,0 +1,30 @@ +#include "hex.h" + +#include "output.h" +#include <util/string/hex.h> + +void HexEncode(const void* in, size_t len, IOutputStream& out) { + static const size_t NUM_OF_BYTES = 32; + char buffer[NUM_OF_BYTES * 2]; + + auto current = static_cast<const char*>(in); + for (size_t take = 0; len; current += take, len -= take) { + take = Min(NUM_OF_BYTES, len); + HexEncode(current, take, buffer); + out.Write(buffer, take * 2); + } +} + +void HexDecode(const void* in, size_t len, IOutputStream& out) { + Y_ENSURE(!(len & 1), TStringBuf("Odd buffer length passed to HexDecode")); + + static const size_t NUM_OF_BYTES = 32; + char buffer[NUM_OF_BYTES]; + + auto current = static_cast<const char*>(in); + for (size_t take = 0; len; current += take, len -= take) { + take = Min(NUM_OF_BYTES * 2, len); + HexDecode(current, take, buffer); + out.Write(buffer, take / 2); + } +} diff --git a/util/stream/hex.h b/util/stream/hex.h new file mode 100644 index 0000000000..a018933b1b --- /dev/null +++ b/util/stream/hex.h @@ -0,0 +1,8 @@ +#pragma once + +#include <util/system/types.h> + +class IOutputStream; + +void HexEncode(const void* in, size_t len, IOutputStream& out); +void HexDecode(const void* in, size_t len, IOutputStream& out); diff --git a/util/stream/hex_ut.cpp b/util/stream/hex_ut.cpp new file mode 100644 index 0000000000..5074a0b616 --- /dev/null +++ b/util/stream/hex_ut.cpp @@ -0,0 +1,29 @@ +#include "hex.h" + +#include <library/cpp/testing/unittest/registar.h> +#include "str.h" + +Y_UNIT_TEST_SUITE(THexCodingTest) { + void TestImpl(const TString& data) { + TString encoded; + TStringOutput encodedOut(encoded); + HexEncode(data.data(), data.size(), encodedOut); + + UNIT_ASSERT_EQUAL(encoded.size(), data.size() * 2); + + TString decoded; + TStringOutput decodedOut(decoded); + HexDecode(encoded.data(), encoded.size(), decodedOut); + + UNIT_ASSERT_EQUAL(decoded, data); + } + + Y_UNIT_TEST(TestEncodeDecodeToStream) { + TString data = "100ABAcaba500,$%0987123456 \n\t\x01\x02\x03."; + TestImpl(data); + } + + Y_UNIT_TEST(TestEmpty) { + TestImpl(""); + } +} diff --git a/util/stream/holder.cpp b/util/stream/holder.cpp new file mode 100644 index 0000000000..f5617eef58 --- /dev/null +++ b/util/stream/holder.cpp @@ -0,0 +1 @@ +#include "holder.h" diff --git a/util/stream/holder.h b/util/stream/holder.h new file mode 100644 index 0000000000..c60a4e510c --- /dev/null +++ b/util/stream/holder.h @@ -0,0 +1,44 @@ +#pragma once + +#include <util/generic/ptr.h> + +#include <utility> +#include <type_traits> + +class IInputStream; +class IOutputStream; + +namespace NPrivate { + template <class Stream, bool isInput = std::is_base_of<IInputStream, Stream>::value> + struct TStreamBase { + using TType = IInputStream; + }; + + template <class Stream> + struct TStreamBase<Stream, false> { + using TType = IOutputStream; + }; + +} + +/** + * An ownership-gaining wrapper for proxy streams. + * + * Example usage: + * \code + * TCountingInput* input = new THoldingStream<TCountingInput>(new TStringInput(s)); + * \encode + * + * In this example, resulting counting input also owns a string input that it + * was constructed on top of. + */ +template <class Base, class StreamBase = typename ::NPrivate::TStreamBase<Base>::TType> +class THoldingStream: private THolder<StreamBase>, public Base { +public: + template <class... Args> + inline THoldingStream(THolder<StreamBase> stream, Args&&... args) + : THolder<StreamBase>(std::move(stream)) + , Base(this->Get(), std::forward<Args>(args)...) + { + } +}; diff --git a/util/stream/input.cpp b/util/stream/input.cpp new file mode 100644 index 0000000000..6e8170f2f9 --- /dev/null +++ b/util/stream/input.cpp @@ -0,0 +1,344 @@ +#include "input.h" +#include "output.h" +#include "str.h" + +#include <util/charset/wide.h> +#include <util/memory/tempbuf.h> +#include <util/generic/string.h> +#include <util/generic/yexception.h> +#include <util/generic/singleton.h> +#include <util/string/cast.h> +#include <util/system/compat.h> +#include <util/system/spinlock.h> + +#include <cstdlib> + +IInputStream::IInputStream() noexcept = default; + +IInputStream::~IInputStream() = default; + +size_t IInputStream::DoReadTo(TString& st, char to) { + char ch; + + if (!Read(&ch, 1)) { + return 0; + } + + st.clear(); + + size_t result = 0; + do { + ++result; + + if (ch == to) { + break; + } + + st += ch; + } while (Read(&ch, 1)); + + return result; +} + +ui64 IInputStream::DoReadAll(IOutputStream& out) { + TTempBuf buffer; + void* ptr = buffer.Data(); + size_t size = buffer.Size(); + + ui64 result = 0; + while (size_t read = Read(ptr, size)) { + out.Write(ptr, read); + result += read; + } + + return result; +} + +size_t IInputStream::Load(void* buf_in, size_t len) { + char* buf = (char*)buf_in; + + while (len) { + const size_t ret = Read(buf, len); + + buf += ret; + len -= ret; + + if (ret == 0) { + break; + } + } + + return buf - (char*)buf_in; +} + +void IInputStream::LoadOrFail(void* buf, size_t len) { + const size_t realLen = Load(buf, len); + if (Y_UNLIKELY(realLen != len)) { + ythrow yexception() << "Failed to read required number of bytes from stream! Expected: " << len << ", gained: " << realLen << "!"; + } +} + +size_t IInputStream::ReadLine(TString& st) { + const size_t ret = ReadTo(st, '\n'); + + if (ret && !st.empty() && st.back() == '\r') { + st.pop_back(); + } + + return ret; +} + +size_t IInputStream::ReadLine(TUtf16String& w) { + TString s; + size_t result = ReadLine(s); + + if (result) { + UTF8ToWide(s, w); + } + + return result; +} + +TString IInputStream::ReadLine() { + TString ret; + + if (!ReadLine(ret)) { + ythrow yexception() << "can not read line from stream"; + } + + return ret; +} + +TString IInputStream::ReadTo(char ch) { + TString ret; + + if (!ReadTo(ret, ch)) { + ythrow yexception() << "can not read from stream"; + } + + return ret; +} + +size_t IInputStream::Skip(size_t sz) { + return DoSkip(sz); +} + +size_t IInputStream::DoSkip(size_t sz) { + if (sz < 128) { + return Load(alloca(sz), sz); + } + + TTempBuf buf; + size_t total = 0; + + while (sz) { + const size_t lresult = Read(buf.Data(), Min<size_t>(sz, buf.Size())); + + if (lresult == 0) { + return total; + } + + total += lresult; + sz -= lresult; + } + + return total; +} + +TString IInputStream::ReadAll() { + TString result; + TStringOutput stream(result); + + DoReadAll(stream); + + return result; +} + +ui64 IInputStream::ReadAll(IOutputStream& out) { + return DoReadAll(out); +} + +ui64 TransferData(IInputStream* in, IOutputStream* out) { + return in->ReadAll(*out); +} + +namespace { + struct TStdIn: public IInputStream { + ~TStdIn() override = default; + + size_t DoRead(void* buf, size_t len) override { + const size_t ret = fread(buf, 1, len, F_); + + if (ret < len && ferror(F_)) { + ythrow TSystemError() << "can not read from stdin"; + } + + return ret; + } + + FILE* F_ = stdin; + }; + +#if defined(_win_) + using TGetLine = TStdIn; +#else + #if defined(_bionic_) + using TGetLineBase = TStdIn; + #else + struct TGetLineBase: public TStdIn { + ~TGetLineBase() override { + free(B_); + } + + size_t DoReadTo(TString& st, char ch) override { + auto&& guard = Guard(M_); + + (void)guard; + + const auto r = getdelim(&B_, &L_, ch, F_); + + if (r < 0) { + if (ferror(F_)) { + ythrow TSystemError() << "can not read from stdin"; + } + + st.clear(); + + return 0; + } + + st.AssignNoAlias(B_, r); + + if (st && st.back() == ch) { + st.pop_back(); + } + + return r; + } + + TAdaptiveLock M_; + char* B_ = nullptr; + size_t L_ = 0; + }; + #endif + + #if defined(_glibc_) || defined(_cygwin_) + // glibc does not have fgetln + using TGetLine = TGetLineBase; + #else + struct TGetLine: public TGetLineBase { + size_t DoReadTo(TString& st, char ch) override { + if (ch == '\n') { + size_t len = 0; + auto r = fgetln(F_, &len); + + if (r) { + st.AssignNoAlias(r, len); + + if (st && st.back() == '\n') { + st.pop_back(); + } + + return len; + } + } + + return TGetLineBase::DoReadTo(st, ch); + } + }; + #endif +#endif +} + +IInputStream& NPrivate::StdInStream() noexcept { + return *SingletonWithPriority<TGetLine, 4>(); +} + +// implementation of >> operator + +// helper functions + +static inline bool IsStdDelimiter(char c) { + return (c == '\0') || (c == ' ') || (c == '\r') || (c == '\n') || (c == '\t'); +} + +static void ReadUpToDelimiter(IInputStream& i, TString& s) { + char c; + while (i.ReadChar(c)) { // skip delimiters + if (!IsStdDelimiter(c)) { + s += c; + break; + } + } + while (i.ReadChar(c) && !IsStdDelimiter(c)) { // read data (with trailing delimiter) + s += c; + } +} + +// specialization for string-related stuff + +template <> +void In<TString>(IInputStream& i, TString& s) { + s.resize(0); + ReadUpToDelimiter(i, s); +} + +template <> +void In<TUtf16String>(IInputStream& i, TUtf16String& w) { + TString s; + ReadUpToDelimiter(i, s); + + if (s.empty()) { + w.erase(); + } else { + w = UTF8ToWide(s); + } +} + +// specialization for char types + +#define SPEC_FOR_CHAR(T) \ + template <> \ + void In<T>(IInputStream & i, T & t) { \ + i.ReadChar((char&)t); \ + } + +SPEC_FOR_CHAR(char) +SPEC_FOR_CHAR(unsigned char) +SPEC_FOR_CHAR(signed char) + +#undef SPEC_FOR_CHAR + +// specialization for number types + +#define SPEC_FOR_NUMBER(T) \ + template <> \ + void In<T>(IInputStream & i, T & t) { \ + char buf[128]; \ + size_t pos = 0; \ + while (i.ReadChar(buf[0])) { \ + if (!IsStdDelimiter(buf[0])) { \ + ++pos; \ + break; \ + } \ + } \ + while (i.ReadChar(buf[pos]) && !IsStdDelimiter(buf[pos]) && pos < 127) { \ + ++pos; \ + } \ + t = FromString<T, char>(buf, pos); \ + } + +SPEC_FOR_NUMBER(signed short) +SPEC_FOR_NUMBER(signed int) +SPEC_FOR_NUMBER(signed long int) +SPEC_FOR_NUMBER(signed long long int) +SPEC_FOR_NUMBER(unsigned short) +SPEC_FOR_NUMBER(unsigned int) +SPEC_FOR_NUMBER(unsigned long int) +SPEC_FOR_NUMBER(unsigned long long int) + +SPEC_FOR_NUMBER(float) +SPEC_FOR_NUMBER(double) +SPEC_FOR_NUMBER(long double) + +#undef SPEC_FOR_NUMBER diff --git a/util/stream/input.h b/util/stream/input.h new file mode 100644 index 0000000000..f0d5807ed2 --- /dev/null +++ b/util/stream/input.h @@ -0,0 +1,273 @@ +#pragma once + +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> +#include <util/system/defaults.h> + +class IOutputStream; + +/** + * @addtogroup Streams_Base + * @{ + */ + +/** + * Abstract input stream. + */ +class IInputStream: public TNonCopyable { +public: + IInputStream() noexcept; + virtual ~IInputStream(); + + IInputStream(IInputStream&&) noexcept { + } + + IInputStream& operator=(IInputStream&&) noexcept { + return *this; + } + + /** + * Reads some data from the stream. Note that this function might read less + * data than what was requested. Use `Load` function if you want to read as + * much data as possible. + * + * @param buf Buffer to read into. + * @param len Number of bytes to read. + * @returns Number of bytes that were actually read. + * A return value of zero signals end of stream. + */ + inline size_t Read(void* buf, size_t len) { + if (len == 0) { + return 0; + } + + return DoRead(buf, len); + } + + /** + * Reads one character from the stream. + * + * @param[out] c Character to read. + * @returns Whether the character was read. + * A return value of false signals the end + * of stream. + */ + inline bool ReadChar(char& c) { + return DoRead(&c, 1) > 0; + } + + /** + * Reads all characters from the stream until the given character is + * encountered, and stores them into the given string. The character itself + * is read from the stream, but not stored in the string. + * + * @param[out] st String to read into. + * @param ch Character to stop at. + * @returns Total number of characters read from the stream. + * A return value of zero signals end of stream. + */ + inline size_t ReadTo(TString& st, char ch) { + return DoReadTo(st, ch); + } + + /** + * Reads the requested amount of data from the stream. Unlike `Read`, this + * function stops only when the requested amount of data is read, or when + * end of stream is reached. + * + * @param buf Buffer to read into. + * @param len Number of bytes to read. + * @returns Number of bytes that were actually read. + * A return value different from `len` + * signals end of stream. + */ + size_t Load(void* buf, size_t len); + + /** + * Reads the requested amount of data from the stream, or fails with an + * exception if unable to do so. + * + * @param buf Buffer to read into. + * @param len Number of bytes to read. + * @see Load + */ + void LoadOrFail(void* buf, size_t len); + + /** + * Reads all data from this stream and returns it as a string. + * + * @returns Contents of this stream as a string. + */ + TString ReadAll(); + + /** + * Reads all data from this stream and writes it into a provided output + * stream. + * + * @param out Output stream to use. + * @returns Total number of characters read from the stream. + */ + ui64 ReadAll(IOutputStream& out); + + /** + * Reads all data from the stream until the first occurrence of '\n'. Also + * handles Windows line breaks correctly. + * + * @returns Next line read from this stream, + * excluding the line terminator. + * @throws yexception If no data could be read from a stream + * because end of stream has already been + * reached. + */ + TString ReadLine(); + + /** + * Reads all characters from the stream until the given character is + * encountered and returns them as a string. The character itself is read + * from the stream, but not stored in the string. + * + * @param ch Character to stop at. + * @returns String containing all the characters read. + * @throws yexception If no data could be read from a stream + * because end of stream has already been + * reached. + */ + TString ReadTo(char ch); + + /** + * Reads all data from the stream until the first occurrence of '\n' and + * stores it into provided string. Also handles Windows line breaks correctly. + * + * @param[out] st String to store read characters into, + * excluding the line terminator. + * @returns Total number of characters read from the stream. + * A return value of zero signals end of stream. + */ + size_t ReadLine(TString& st); + + /** + * Reads UTF8 encoded characters from the stream the first occurrence of '\n', + * converts them into wide ones, and stores into provided string. Also handles + * Windows line breaks correctly. + * + * @param[out] w Wide string to store read characters into, + * excluding the line terminator. + * @returns Total number of characters read from the stream. + * A return value of zero signals end of stream. + */ + size_t ReadLine(TUtf16String& w); + + /** + * Skips some data from the stream without reading / copying it. Note that + * this function might skip less data than what was requested. + * + * @param len Number of bytes to skip. + * @returns Number of bytes that were actually skipped. + * A return value of zero signals end of stream. + */ + size_t Skip(size_t len); + +protected: + /** + * Reads some data from the stream. Might read less data than what was + * requested. + * + * @param buf Buffer to read into. + * @param len Number of bytes to read. + * @returns Number of bytes that were actually read. + * A return value of zero signals end of stream. + * @throws yexception If IO error occurs. + */ + virtual size_t DoRead(void* buf, size_t len) = 0; + + /** + * Skips some data from the stream. Might skip less data than what was + * requested. + * + * @param len Number of bytes to skip. + * @returns Number of bytes that were actually skipped. + * A return value of zero signals end of stream. + * @throws yexception If IO error occurs. + */ + virtual size_t DoSkip(size_t len); + + /** + * Reads all characters from the stream until the given character is + * encountered, and stores them into the given string. The character itself + * is read from the stream, but not stored in the string. + * + * Provided string is cleared only if there is data in the stream. + * + * @param[out] st String to read into. + * @param ch Character to stop at. + * @returns Total number of characters read from the stream. + * A return value of zero signals end of stream. + * @throws yexception If IO error occurs. + */ + virtual size_t DoReadTo(TString& st, char ch); + + /** + * Reads all data from this stream and writes it into a provided output + * stream. + * + * @param out Output stream to use. + * @returns Total number of characters read from + * this stream. + * @throws yexception If IO error occurs. + */ + virtual ui64 DoReadAll(IOutputStream& out); +}; + +/** + * Transfers all data from the given input stream into the given output stream. + * + * @param in Input stream. + * @param out Output stream. + */ +ui64 TransferData(IInputStream* in, IOutputStream* out); + +/** + * `operator>>` for `IInputStream` by default delegates to this function. + * + * Note that while `operator>>` uses overloading (and thus argument-dependent + * lookup), `In` uses template specializations. This makes it possible to + * have a single `In` declaration, and then just provide specializations in + * cpp files, letting the linker figure everything else out. This approach + * reduces compilation times. + * + * However, if the flexibility of overload resolution is needed, then one should + * just overload `operator>>`. + * + * @param in Input stream to read from. + * @param[out] value Value to read. + * @throws `yexception` on invalid input or end of stream. + * @see Out(IOutputStream&, T&) + */ +template <typename T> +void In(IInputStream& in, T& value); + +/** + * Reads a value from the stream. + * + * @param in Input stream to read from. + * @param[out] value Value to read. + * @returns Input stream. + * @throws `yexception` on invalid input or end of stream. + * @see operator<<(IOutputStream&, T&) + */ +template <typename T> +inline IInputStream& operator>>(IInputStream& in, T& value) { + In<T>(in, value); + return in; +} + +namespace NPrivate { + IInputStream& StdInStream() noexcept; +} + +/** + * Standard input stream. + */ +#define Cin (::NPrivate::StdInStream()) + +/** @} */ diff --git a/util/stream/input_ut.cpp b/util/stream/input_ut.cpp new file mode 100644 index 0000000000..4a93f5458e --- /dev/null +++ b/util/stream/input_ut.cpp @@ -0,0 +1,157 @@ +#include "input.h" +#include "output.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/file.h> +#include <util/system/yassert.h> + +#ifdef _win_ + #include <io.h> +#endif + +class TMockStdIn { +public: + TMockStdIn() + : StdInCopy_(dup(0)) + { + } + ~TMockStdIn() { + close(StdInCopy_); + } + + template <typename FuncType> + void ForInput(const TStringBuf text, const FuncType& func) { + TFile tempFile(TFile::Temporary("input_ut")); + tempFile.Write(text.data(), text.size()); + tempFile.FlushData(); + tempFile.Seek(0, sSet); + + TFileHandle tempFh(tempFile.GetHandle()); + tempFh.Duplicate2Posix(0); + tempFh.Release(); + + func(); + Cin.ReadAll(); + dup2(StdInCopy_, 0); + clearerr(stdin); + } + +private: + int StdInCopy_; +}; + +class TNoInput: public IInputStream { +public: + TNoInput(ui64 size) + : Size_(size) + { + } + +protected: + size_t DoRead(void*, size_t len) override { + len = Min(static_cast<ui64>(len), Size_); + Size_ -= len; + return len; + } + +private: + ui64 Size_; +}; + +class TNoOutput: public IOutputStream { +public: + TNoOutput() = default; + +protected: + void DoWrite(const void*, size_t) override { + } +}; + +class TSimpleStringInput: public IInputStream { +public: + TSimpleStringInput(const TString& string) + : String_(string) + { + } + +protected: + size_t DoRead(void* buf, size_t len) override { + Y_ASSERT(len != 0); + + if (String_.empty()) { + return 0; + } + + *static_cast<char*>(buf) = String_[0]; + String_.remove(0, 1); + return 1; + } + +private: + TString String_; +}; + +Y_UNIT_TEST_SUITE(TInputTest) { + Y_UNIT_TEST(BigTransfer) { + ui64 size = 1024ull * 1024ull * 1024ull * 5; + TNoInput input(size); + TNoOutput output; + + ui64 transferred = TransferData(&input, &output); + + UNIT_ASSERT_VALUES_EQUAL(transferred, size); + } + + Y_UNIT_TEST(TestReadTo) { + /* This one tests default implementation of ReadTo. */ + + TSimpleStringInput in("0123456789abc"); + + TString t; + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8); + UNIT_ASSERT_VALUES_EQUAL(t, "0123456"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 5); + UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 0); + UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); + } + + Y_UNIT_TEST(TestReadLine) { + TSimpleStringInput in("1\n22\n333"); + + TString t; + UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 2); + UNIT_ASSERT_VALUES_EQUAL(t, "1"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 3); + UNIT_ASSERT_VALUES_EQUAL(t, "22"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 3); + UNIT_ASSERT_VALUES_EQUAL(t, "333"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadLine(t), 0); + UNIT_ASSERT_VALUES_EQUAL(t, "333"); + } + + Y_UNIT_TEST(TestStdInReadTo) { + std::pair<std::pair<TStringBuf, char>, TStringBuf> testPairs[] = { + {{"", '\n'}, ""}, + {{"\n", '\n'}, ""}, + {{"\n\t", '\t'}, "\n"}, + {{"\t\n", '\n'}, "\t"}, + {{"a\tb\n", '\t'}, "a"}}; + + TMockStdIn stdIn; + + for (const auto& testPair : testPairs) { + const TStringBuf text = testPair.first.first; + const char delim = testPair.first.second; + const TStringBuf expectedValue = testPair.second; + + stdIn.ForInput(text, + [=] { + TString value; + Cin.ReadTo(value, delim); + UNIT_ASSERT_VALUES_EQUAL(value, expectedValue); + }); + } + } +} diff --git a/util/stream/ios_ut.cpp b/util/stream/ios_ut.cpp new file mode 100644 index 0000000000..139f4296e5 --- /dev/null +++ b/util/stream/ios_ut.cpp @@ -0,0 +1,497 @@ +#include "output.h" +#include "tokenizer.h" +#include "buffer.h" +#include "buffered.h" +#include "walk.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/string/cast.h> +#include <util/memory/tempbuf.h> +#include <util/charset/wide.h> + +#include <string> +#include <iostream> + +class TStreamsTest: public TTestBase { + UNIT_TEST_SUITE(TStreamsTest); + UNIT_TEST(TestGenericRead); + UNIT_TEST(TestGenericWrite); + UNIT_TEST(TestReadLine); + UNIT_TEST(TestMemoryStream); + UNIT_TEST(TestBufferedIO); + UNIT_TEST(TestBufferStream); + UNIT_TEST(TestStringStream); + UNIT_TEST(TestWtrokaInput); + UNIT_TEST(TestStrokaInput); + UNIT_TEST(TestReadTo); + UNIT_TEST(TestWtrokaOutput); + UNIT_TEST(TestIStreamOperators); + UNIT_TEST(TestWchar16Output); + UNIT_TEST(TestWchar32Output); + UNIT_TEST(TestUtf16StingOutputByChars); + UNIT_TEST_SUITE_END(); + +public: + void TestGenericRead(); + void TestGenericWrite(); + void TestReadLine(); + void TestMemoryStream(); + void TestBufferedIO(); + void TestBufferStream(); + void TestStringStream(); + void TestWtrokaInput(); + void TestStrokaInput(); + void TestWtrokaOutput(); + void TestIStreamOperators(); + void TestReadTo(); + void TestWchar16Output(); + void TestWchar32Output(); + void TestUtf16StingOutputByChars(); +}; + +UNIT_TEST_SUITE_REGISTRATION(TStreamsTest); + +void TStreamsTest::TestIStreamOperators() { + TString data("first line\r\nsecond\t\xd1\x82\xd0\xb5\xd1\x81\xd1\x82 line\r\n 1 -4 59 4320000000009999999 c\n -1.5 1e-110"); + TStringInput si(data); + + TString l1; + TString l2; + TString l3; + TUtf16String w1; + TString l4; + ui16 i1; + i16 i2; + i32 i3; + ui64 i4; + char c1; + unsigned char c2; + float f1; + double f2; + + si >> l1 >> l2 >> l3 >> w1 >> l4 >> i1 >> i2 >> i3 >> i4 >> c1 >> c2 >> f1 >> f2; + + UNIT_ASSERT_EQUAL(l1, "first"); + UNIT_ASSERT_EQUAL(l2, "line"); + UNIT_ASSERT_EQUAL(l3, "second"); + UNIT_ASSERT_EQUAL(l4, "line"); + UNIT_ASSERT_EQUAL(i1, 1); + UNIT_ASSERT_EQUAL(i2, -4); + UNIT_ASSERT_EQUAL(i3, 59); + UNIT_ASSERT_EQUAL(i4, 4320000000009999999ULL); + UNIT_ASSERT_EQUAL(c1, 'c'); + UNIT_ASSERT_EQUAL(c2, '\n'); + UNIT_ASSERT_EQUAL(f1, -1.5); + UNIT_ASSERT_EQUAL(f2, 1e-110); +} + +void TStreamsTest::TestStringStream() { + TStringStream s; + + s << "qw\r\n1234" + << "\n" + << 34; + + UNIT_ASSERT_EQUAL(s.ReadLine(), "qw"); + UNIT_ASSERT_EQUAL(s.ReadLine(), "1234"); + + s << "\r\n" + << 123.1; + + UNIT_ASSERT_EQUAL(s.ReadLine(), "34"); + UNIT_ASSERT_EQUAL(s.ReadLine(), "123.1"); + + UNIT_ASSERT_EQUAL(s.Str(), "qw\r\n1234\n34\r\n123.1"); + + // Test stream copying + TStringStream sc = s; + + s << "-666-" << 13; + sc << "-777-" << 0 << "JackPot"; + + UNIT_ASSERT_EQUAL(s.Str(), "qw\r\n1234\n34\r\n123.1-666-13"); + UNIT_ASSERT_EQUAL(sc.Str(), "qw\r\n1234\n34\r\n123.1-777-0JackPot"); + + TStringStream ss; + ss = s; + s << "... and some trash"; + UNIT_ASSERT_EQUAL(ss.Str(), "qw\r\n1234\n34\r\n123.1-666-13"); +} + +void TStreamsTest::TestGenericRead() { + TString s("1234567890"); + TStringInput si(s); + char buf[1024]; + + UNIT_ASSERT_EQUAL(si.Read(buf, 6), 6); + UNIT_ASSERT_EQUAL(memcmp(buf, "123456", 6), 0); + UNIT_ASSERT_EQUAL(si.Read(buf, 6), 4); + UNIT_ASSERT_EQUAL(memcmp(buf, "7890", 4), 0); +} + +void TStreamsTest::TestGenericWrite() { + TString s; + TStringOutput so(s); + + so.Write("123456", 6); + so.Write("7890", 4); + + UNIT_ASSERT_EQUAL(s, "1234567890"); +} + +void TStreamsTest::TestReadLine() { + TString data("1234\r\n5678\nqw"); + TStringInput si(data); + + UNIT_ASSERT_EQUAL(si.ReadLine(), "1234"); + UNIT_ASSERT_EQUAL(si.ReadLine(), "5678"); + UNIT_ASSERT_EQUAL(si.ReadLine(), "qw"); +} + +void TStreamsTest::TestMemoryStream() { + char buf[1024]; + TMemoryOutput mo(buf, sizeof(buf)); + bool ehandled = false; + + try { + for (size_t i = 0; i < sizeof(buf) + 1; ++i) { + mo.Write(i % 127); + } + } catch (...) { + ehandled = true; + } + + UNIT_ASSERT_EQUAL(ehandled, true); + + for (size_t i = 0; i < sizeof(buf); ++i) { + UNIT_ASSERT_EQUAL(buf[i], (char)(i % 127)); + } +} + +class TMyStringOutput: public IOutputStream { +public: + inline TMyStringOutput(TString& s, size_t buflen) noexcept + : S_(s) + , BufLen_(buflen) + { + } + + ~TMyStringOutput() override = default; + + void DoWrite(const void* data, size_t len) override { + S_.Write(data, len); + UNIT_ASSERT(len < BufLen_ || ((len % BufLen_) == 0)); + } + + void DoWriteV(const TPart* p, size_t count) override { + TString s; + + for (size_t i = 0; i < count; ++i) { + s.append((const char*)p[i].buf, p[i].len); + } + + DoWrite(s.data(), s.size()); + } + +private: + TStringOutput S_; + const size_t BufLen_; +}; + +void TStreamsTest::TestBufferedIO() { + TString s; + + { + const size_t buflen = 7; + TBuffered<TMyStringOutput> bo(buflen, s, buflen); + + for (size_t i = 0; i < 1000; ++i) { + TString str(" "); + str += ToString(i % 10); + + bo.Write(str.data(), str.size()); + } + + bo.Finish(); + } + + UNIT_ASSERT_EQUAL(s.size(), 2000); + + { + const size_t buflen = 11; + TBuffered<TStringInput> bi(buflen, s); + + for (size_t i = 0; i < 1000; ++i) { + TString str(" "); + str += ToString(i % 10); + + char buf[3]; + + UNIT_ASSERT_EQUAL(bi.Load(buf, 2), 2); + + buf[2] = 0; + + UNIT_ASSERT_EQUAL(str, buf); + } + } + + s.clear(); + + { + const size_t buflen = 13; + TBuffered<TMyStringOutput> bo(buflen, s, buflen); + TString f = "1234567890"; + + for (size_t i = 0; i < 10; ++i) { + f += f; + } + + for (size_t i = 0; i < 1000; ++i) { + bo.Write(f.data(), i); + } + + bo.Finish(); + } +} + +void TStreamsTest::TestBufferStream() { + TBufferStream stream; + TString s = "test"; + + stream.Write(s.data(), s.size()); + char buf[5]; + size_t bytesRead = stream.Read(buf, 4); + UNIT_ASSERT_EQUAL(4, bytesRead); + UNIT_ASSERT_EQUAL(0, strncmp(s.data(), buf, 4)); + + stream.Write(s.data(), s.size()); + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(2, bytesRead); + UNIT_ASSERT_EQUAL(0, strncmp("te", buf, 2)); + + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(2, bytesRead); + UNIT_ASSERT_EQUAL(0, strncmp("st", buf, 2)); + + bytesRead = stream.Read(buf, 2); + UNIT_ASSERT_EQUAL(0, bytesRead); +} + +namespace { + class TStringListInput: public IWalkInput { + public: + TStringListInput(const TVector<TString>& data) + : Data_(data) + , Index_(0) + { + } + + protected: + size_t DoUnboundedNext(const void** ptr) override { + if (Index_ >= Data_.size()) { + return 0; + } + + const TString& string = Data_[Index_++]; + + *ptr = string.data(); + return string.size(); + } + + private: + const TVector<TString>& Data_; + size_t Index_; + }; + + const char Text[] = + // UTF8 encoded "one \ntwo\r\nthree\n\tfour\nfive\n" in russian and ... + "один \n" + "два\r\n" + "три\n" + "\tчетыре\n" + "пять\n" + // ... additional test cases + "\r\n" + "\n\r" // this char goes to the front of the next string + "one two\n" + "123\r\n" + "\t\r "; + + const char* Expected[] = { + // UTF8 encoded "one ", "two", "three", "\tfour", "five" in russian and ... + "один ", + "два", + "три", + "\tчетыре", + "пять", + // ... additional test cases + "", + "", + "\rone two", + "123", + "\t\r "}; + void TestStreamReadTo1(IInputStream& input, const char* comment) { + TString tmp; + input.ReadTo(tmp, 'c'); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "111a222b333", comment); + + char tmp2; + input.Read(&tmp2, 1); + UNIT_ASSERT_VALUES_EQUAL_C(tmp2, '4', comment); + + input.ReadTo(tmp, '6'); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "44d555e", comment); + + tmp = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "66f", comment); + } + + void TestStreamReadTo2(IInputStream& input, const char* comment) { + TString s; + size_t i = 0; + while (input.ReadLine(s)) { + UNIT_ASSERT_C(i < Y_ARRAY_SIZE(Expected), comment); + UNIT_ASSERT_VALUES_EQUAL_C(s, Expected[i], comment); + ++i; + } + } + + void TestStreamReadTo3(IInputStream& input, const char* comment) { + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadLine(), "111a222b333c444d555e666f", comment); + } + + void TestStreamReadTo4(IInputStream& input, const char* comment) { + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "one", comment); + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "two", comment); + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "three", comment); + } + + void TestStrokaInput(IInputStream& input, const char* comment) { + TString line; + ui32 i = 0; + TInstant start = Now(); + while (input.ReadLine(line)) { + ++i; + } + Cout << comment << ":" << (Now() - start).SecondsFloat() << Endl; + UNIT_ASSERT_VALUES_EQUAL(i, 100000); + } + + template <class T> + void TestStreamReadTo(const TString& text, T test) { + TStringInput is(text); + test(is, "TStringInput"); + TMemoryInput mi(text.data(), text.size()); + test(mi, "TMemoryInput"); + TBuffer b(text.data(), text.size()); + TBufferInput bi(b); + test(bi, "TBufferInput"); + TStringInput slave(text); + TBufferedInput bdi(&slave); + test(bdi, "TBufferedInput"); + TVector<TString> lst(1, text); + TStringListInput sli(lst); + test(sli, "IWalkInput"); + } +} + +void TStreamsTest::TestReadTo() { + TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo1); + TestStreamReadTo(Text, TestStreamReadTo2); + TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo3); + TString withZero = "one"; + withZero.append('\0').append("two").append('\0').append("three"); + TestStreamReadTo(withZero, TestStreamReadTo4); +} + +void TStreamsTest::TestStrokaInput() { + TString s; + for (ui32 i = 0; i < 100000; ++i) { + TVector<char> d(i % 1000, 'a'); + s.append(d.data(), d.size()); + s.append('\n'); + } + TestStreamReadTo(s, ::TestStrokaInput); +} + +void TStreamsTest::TestWtrokaInput() { + const TString s(Text); + TStringInput is(s); + TUtf16String w; + size_t i = 0; + + while (is.ReadLine(w)) { + UNIT_ASSERT(i < Y_ARRAY_SIZE(Expected)); + UNIT_ASSERT_VALUES_EQUAL(w, UTF8ToWide(Expected[i])); + + ++i; + } +} + +void TStreamsTest::TestWtrokaOutput() { + TString s; + TStringOutput os(s); + const size_t n = sizeof(Expected) / sizeof(Expected[0]); + + for (size_t i = 0; i < n; ++i) { + TUtf16String w = UTF8ToWide(Expected[i]); + + os << w; + + if (i == 1 || i == 5 || i == 8) { + os << '\r'; + } + + if (i < n - 1) { + os << '\n'; + } + } + + UNIT_ASSERT(s == Text); +} + +void TStreamsTest::TestWchar16Output() { + TString s; + TStringOutput os(s); + os << wchar16(97); // latin a + os << u'\u044E'; // cyrillic ю + os << u'я'; + os << wchar16(0xD801); // high surrogate is printed as replacement character U+FFFD + os << u'b'; + + UNIT_ASSERT_VALUES_EQUAL(s, "aюя" + "\xEF\xBF\xBD" + "b"); +} + +void TStreamsTest::TestWchar32Output() { + TString s; + TStringOutput os(s); + os << wchar32(97); // latin a + os << U'\u044E'; // cyrillic ю + os << U'я'; + os << U'\U0001F600'; // grinning face + os << u'b'; + + UNIT_ASSERT_VALUES_EQUAL(s, "aюя" + "\xF0\x9F\x98\x80" + "b"); +} + +void TStreamsTest::TestUtf16StingOutputByChars() { + TString s = "\xd1\x87\xd0\xb8\xd1\x81\xd1\x82\xd0\xb8\xd1\x87\xd0\xb8\xd1\x81\xd1\x82\xd0\xb8"; + TUtf16String w = UTF8ToWide(s); + + UNIT_ASSERT_VALUES_EQUAL(w.size(), 10); + + TStringStream stream0; + stream0 << w; + UNIT_ASSERT_VALUES_EQUAL(stream0.Str(), s); + + TStringStream stream1; + for (size_t i = 0; i < 10; i++) { + stream1 << w[i]; + } + UNIT_ASSERT_VALUES_EQUAL(stream1.Str(), s); +} diff --git a/util/stream/labeled.cpp b/util/stream/labeled.cpp new file mode 100644 index 0000000000..56a886ca30 --- /dev/null +++ b/util/stream/labeled.cpp @@ -0,0 +1 @@ +#include "labeled.h" diff --git a/util/stream/labeled.h b/util/stream/labeled.h new file mode 100644 index 0000000000..2cc539d241 --- /dev/null +++ b/util/stream/labeled.h @@ -0,0 +1,19 @@ +#pragma once + +#include <util/generic/va_args.h> + +/** + * Generates an output sequence for the provided expressions that is formatted + * as a labeled comma-separated list. + * + * Example usage: + * @code + * int a = 1, b = 2, c = 3; + * stream << LabeledOutput(a, b, c, a + b + c); + * // Outputs "a = 1, b = 2, c = 3, a + b + c = 6" + * @endcode + */ +#define LabeledOutput(...) "" Y_PASS_VA_ARGS(Y_MAP_ARGS_WITH_LAST(__LABELED_OUTPUT_NONLAST__, __LABELED_OUTPUT_IMPL__, __VA_ARGS__)) + +#define __LABELED_OUTPUT_IMPL__(x) << #x " = " << (x) +#define __LABELED_OUTPUT_NONLAST__(x) __LABELED_OUTPUT_IMPL__(x) << ", " diff --git a/util/stream/labeled_ut.cpp b/util/stream/labeled_ut.cpp new file mode 100644 index 0000000000..12d0dc5004 --- /dev/null +++ b/util/stream/labeled_ut.cpp @@ -0,0 +1,12 @@ +#include "str.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TLabeledOutputTest) { + Y_UNIT_TEST(TBasicTest) { + TStringStream out; + int x = 3; + out << LabeledOutput(x, 1, 2, 3 + 4); + UNIT_ASSERT_STRINGS_EQUAL(out.Str(), "x = 3, 1 = 1, 2 = 2, 3 + 4 = 7"); + } +} diff --git a/util/stream/length.cpp b/util/stream/length.cpp new file mode 100644 index 0000000000..9907fe2ac9 --- /dev/null +++ b/util/stream/length.cpp @@ -0,0 +1,47 @@ +#include "length.h" + +size_t TLengthLimitedInput::DoRead(void* buf, size_t len) { + const size_t toRead = (size_t)Min<ui64>(Length_, len); + const size_t ret = Slave_->Read(buf, toRead); + + Length_ -= ret; + + return ret; +} + +size_t TLengthLimitedInput::DoSkip(size_t len) { + len = (size_t)Min<ui64>((ui64)len, Length_); + len = Slave_->Skip(len); + Length_ -= len; + return len; +} + +size_t TCountingInput::DoRead(void* buf, size_t len) { + const size_t ret = Slave_->Read(buf, len); + Count_ += ret; + return ret; +} + +size_t TCountingInput::DoSkip(size_t len) { + const size_t ret = Slave_->Skip(len); + Count_ += ret; + return ret; +} + +size_t TCountingInput::DoReadTo(TString& st, char ch) { + const size_t ret = Slave_->ReadTo(st, ch); + Count_ += ret; + return ret; +} + +ui64 TCountingInput::DoReadAll(IOutputStream& out) { + const ui64 ret = Slave_->ReadAll(out); + Count_ += ret; + return ret; +} + +void TCountingOutput::DoWrite(const void* buf, size_t len) { + Slave_->Write(buf, len); + + Count_ += len; +} diff --git a/util/stream/length.h b/util/stream/length.h new file mode 100644 index 0000000000..4d508ae24d --- /dev/null +++ b/util/stream/length.h @@ -0,0 +1,100 @@ +#pragma once + +#include "input.h" +#include "output.h" + +#include <util/generic/utility.h> + +/** + * Proxy input stream that can read a limited number of characters from a slave + * stream. + * + * This can be useful for breaking up the slave stream into small chunks and + * treat these as separate streams. + */ +class TLengthLimitedInput: public IInputStream { +public: + inline TLengthLimitedInput(IInputStream* slave, ui64 length) noexcept + : Slave_(slave) + , Length_(length) + { + } + + ~TLengthLimitedInput() override = default; + + inline ui64 Left() const noexcept { + return Length_; + } + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + +private: + IInputStream* Slave_; + ui64 Length_; +}; + +/** + * Proxy input stream that counts the number of characters read. + */ +class TCountingInput: public IInputStream { +public: + inline TCountingInput(IInputStream* slave) noexcept + : Slave_(slave) + , Count_() + { + } + + ~TCountingInput() override = default; + + /** + * \returns The total number of characters read from + * this stream. + */ + inline ui64 Counter() const noexcept { + return Count_; + } + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + size_t DoReadTo(TString& st, char ch) override; + ui64 DoReadAll(IOutputStream& out) override; + +private: + IInputStream* Slave_; + ui64 Count_; +}; + +/** + * Proxy output stream that counts the number of characters written. + */ +class TCountingOutput: public IOutputStream { +public: + inline TCountingOutput(IOutputStream* slave) noexcept + : Slave_(slave) + , Count_() + { + } + + ~TCountingOutput() override = default; + + TCountingOutput(TCountingOutput&&) noexcept = default; + TCountingOutput& operator=(TCountingOutput&&) noexcept = default; + + /** + * \returns The total number of characters written + * into this stream. + */ + inline ui64 Counter() const noexcept { + return Count_; + } + +private: + void DoWrite(const void* buf, size_t len) override; + +private: + IOutputStream* Slave_; + ui64 Count_; +}; diff --git a/util/stream/length_ut.cpp b/util/stream/length_ut.cpp new file mode 100644 index 0000000000..8968448954 --- /dev/null +++ b/util/stream/length_ut.cpp @@ -0,0 +1,52 @@ +#include "length.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> + +Y_UNIT_TEST_SUITE(TestLengthIO) { + Y_UNIT_TEST(TestLengthLimitedInput) { + char buf[16]; + + TStringStream s1("abcd"); + TLengthLimitedInput l1(&s1, 2); + UNIT_ASSERT_VALUES_EQUAL(l1.Load(buf, 3), 2); + UNIT_ASSERT_VALUES_EQUAL(l1.Read(buf, 1), 0); + } + + Y_UNIT_TEST(TestCountingInput) { + char buf[16]; + + TStringStream s1("abc\ndef\n"); + TCountingInput l1(&s1); + + TString s; + l1.ReadLine(s); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 4); + + l1.Load(buf, 1); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 5); + + l1.Skip(1); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 6); + + l1.ReadLine(s); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 8); + } + + Y_UNIT_TEST(TestCountingOutput) { + TStringStream s1; + TCountingOutput l1(&s1); + + l1.Write('1'); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 1); + + l1.Write(TString("abcd")); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 5); + + TString buf("aaa"); + IOutputStream::TPart parts[] = {{buf.data(), buf.size()}, {buf.data(), buf.size()}, {buf.data(), buf.size()}}; + l1.Write(parts, 3); + UNIT_ASSERT_VALUES_EQUAL(l1.Counter(), 14); + } +} diff --git a/util/stream/mem.cpp b/util/stream/mem.cpp new file mode 100644 index 0000000000..22a3339e27 --- /dev/null +++ b/util/stream/mem.cpp @@ -0,0 +1,65 @@ +#include "mem.h" + +#include <util/generic/yexception.h> + +TMemoryInput::TMemoryInput() noexcept + : Buf_(nullptr) + , Len_(0) +{ +} + +TMemoryInput::TMemoryInput(const void* buf, size_t len) noexcept + : Buf_((const char*)buf) + , Len_(len) +{ +} + +TMemoryInput::TMemoryInput(const TStringBuf buf) noexcept + : Buf_(buf.data()) + , Len_(buf.size()) +{ +} + +TMemoryInput::~TMemoryInput() = default; + +size_t TMemoryInput::DoNext(const void** ptr, size_t len) { + len = Min(Len_, len); + + *ptr = Buf_; + Len_ -= len; + Buf_ += len; + return len; +} + +void TMemoryInput::DoUndo(size_t len) { + Len_ += len; + Buf_ -= len; +} + +TMemoryOutput::~TMemoryOutput() = default; + +size_t TMemoryOutput::DoNext(void** ptr) { + Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted")); + *ptr = Buf_; + size_t bufferSize = End_ - Buf_; + Buf_ = End_; + + return bufferSize; +} + +void TMemoryOutput::DoUndo(size_t len) { + Buf_ -= len; +} + +void TMemoryOutput::DoWrite(const void* buf, size_t len) { + char* end = Buf_ + len; + Y_ENSURE(end <= End_, TStringBuf("memory output stream exhausted")); + + memcpy(Buf_, buf, len); + Buf_ = end; +} + +void TMemoryOutput::DoWriteC(char c) { + Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted")); + *Buf_++ = c; +} diff --git a/util/stream/mem.h b/util/stream/mem.h new file mode 100644 index 0000000000..18a5d46772 --- /dev/null +++ b/util/stream/mem.h @@ -0,0 +1,255 @@ +#pragma once + +#include "zerocopy.h" +#include "zerocopy_output.h" + +#include <util/generic/strbuf.h> + +/** + * @addtogroup Streams_Memory + * @{ + */ + +/** + * Input stream that reads data from a memory block. + */ +class TMemoryInput: public IZeroCopyInputFastReadTo { +public: + TMemoryInput() noexcept; + + /** + * Constructs a stream that reads from the provided memory block. It's up + * to the user to make sure that the memory block doesn't get freed while + * this stream is in use. + * + * @param buf Memory block to use. + * @param len Size of the memory block. + */ + TMemoryInput(const void* buf, size_t len) noexcept; + explicit TMemoryInput(const TStringBuf buf) noexcept; + ~TMemoryInput() override; + + TMemoryInput(const TMemoryInput& other) noexcept + : IZeroCopyInputFastReadTo() + , Buf_(other.Buf_) + , Len_(other.Len_) + { + } + + TMemoryInput& operator=(const TMemoryInput& other) noexcept { + if (this != &other) { + Buf_ = other.Buf_; + Len_ = other.Len_; + } + + return *this; + } + + TMemoryInput(TMemoryInput&&) noexcept = default; + TMemoryInput& operator=(TMemoryInput&&) noexcept = default; + + /** + * Initializes this stream with a new memory block. It's up to the + * user to make sure that the memory block doesn't get freed while this + * stream is in use. + * + * @param buf New memory block to use. + * @param len Size of the new memory block. + */ + void Reset(const void* buf, size_t len) noexcept { + Buf_ = (const char*)buf; + Len_ = len; + } + + /** + * @returns Whether there is more data in the stream. + */ + bool Exhausted() const noexcept { + return !Avail(); + } + + /** + * @returns Number of bytes available in the stream. + */ + size_t Avail() const noexcept { + return Len_; + } + + /** + * @returns Current read position in the memory block + * used by this stream. + */ + const char* Buf() const noexcept { + return Buf_; + } + + /** + * Initializes this stream with a next chunk extracted from the given zero + * copy stream. + * + * @param stream Zero copy stream to initialize from. + */ + void Fill(IZeroCopyInput* stream) { + Len_ = stream->Next(&Buf_); + if (!Len_) { + Reset(nullptr, 0); + } + } + +private: + size_t DoNext(const void** ptr, size_t len) override; + void DoUndo(size_t len) override; + +private: + const char* Buf_; + size_t Len_; +}; + +/** + * Output stream that writes data to a memory block. + */ +class TMemoryOutput: public IZeroCopyOutput { +public: + /** + * Constructs a stream that writes to the provided memory block. It's up + * to the user to make sure that the memory block doesn't get freed while + * this stream is in use. + * + * @param buf Memory block to use. + * @param len Size of the memory block. + */ + TMemoryOutput(void* buf, size_t len) noexcept + : Buf_(static_cast<char*>(buf)) + , End_(Buf_ + len) + { + } + ~TMemoryOutput() override; + + TMemoryOutput(TMemoryOutput&&) noexcept = default; + TMemoryOutput& operator=(TMemoryOutput&&) noexcept = default; + + /** + * Initializes this stream with a new memory block. It's up to the + * user to make sure that the memory block doesn't get freed while this + * stream is in use. + * + * @param buf New memory block to use. + * @param len Size of the new memory block. + */ + inline void Reset(void* buf, size_t len) noexcept { + Buf_ = static_cast<char*>(buf); + End_ = Buf_ + len; + } + + /** + * @returns Whether there is more space in the + * stream for writing. + */ + inline bool Exhausted() const noexcept { + return !Avail(); + } + + /** + * @returns Number of bytes available for writing + * in the stream. + */ + inline size_t Avail() const noexcept { + return End_ - Buf_; + } + + /** + * @returns Current write position in the memory block + * used by this stream. + */ + inline char* Buf() const noexcept { + return Buf_; + } + + /** + * @returns Pointer to the end of the memory block + * used by this stream. + */ + char* End() const { + return End_; + } + +private: + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; + void DoWrite(const void* buf, size_t len) override; + void DoWriteC(char c) override; + +protected: + char* Buf_; + char* End_; +}; + +/** + * Memory output stream that supports changing the position of the + * write pointer. + * + * @see TMemoryOutput + */ +class TMemoryWriteBuffer: public TMemoryOutput { +public: + TMemoryWriteBuffer(void* buf, size_t len) + : TMemoryOutput(buf, len) + , Beg_(Buf_) + { + } + + void Reset(void* buf, size_t len) { + TMemoryOutput::Reset(buf, len); + Beg_ = Buf_; + } + + size_t Len() const { + return Buf() - Beg(); + } + + size_t Empty() const { + return Buf() == Beg(); + } + + /** + * @returns Data that has been written into this + * stream as a string. + */ + TStringBuf Str() const { + return TStringBuf(Beg(), Buf()); + } + + char* Beg() const { + return Beg_; + } + + /** + * @param ptr New write position for this stream. + * Must be inside the memory block that + * this stream uses. + */ + void SetPos(char* ptr) { + Y_ASSERT(Beg_ <= ptr); + SetPosImpl(ptr); + } + + /** + * @param pos New write position for this stream, + * relative to the beginning of the memory + * block that this stream uses. + */ + void SetPos(size_t pos) { + SetPosImpl(Beg_ + pos); + } + +protected: + void SetPosImpl(char* ptr) { + Y_ASSERT(End_ >= ptr); + Buf_ = ptr; + } + +protected: + char* Beg_; +}; + +/** @} */ diff --git a/util/stream/mem_ut.cpp b/util/stream/mem_ut.cpp new file mode 100644 index 0000000000..f388ae66ac --- /dev/null +++ b/util/stream/mem_ut.cpp @@ -0,0 +1,78 @@ +#include "mem.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TestMemIO) { + Y_UNIT_TEST(TestReadTo) { + TString s("0123456789abc"); + TMemoryInput in(s); + + TString t; + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, '7'), 8); + UNIT_ASSERT_VALUES_EQUAL(t, "0123456"); + UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 5); + UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); + } + + Y_UNIT_TEST(NextAndUndo) { + char buffer[20]; + TMemoryOutput output(buffer, sizeof(buffer)); + char* ptr = nullptr; + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 1); + *ptr = '1'; + if (bufferSize > 1) { + output.Undo(bufferSize - 1); + } + + bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 2); + *ptr = '2'; + *(ptr + 1) = '2'; + if (bufferSize > 2) { + output.Undo(bufferSize - 2); + } + + bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 3); + *ptr = '3'; + *(ptr + 1) = '3'; + *(ptr + 2) = '3'; + if (bufferSize > 3) { + output.Undo(bufferSize - 3); + } + + output.Finish(); + + const char* const result = "1" + "22" + "333"; + UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); + } + + Y_UNIT_TEST(Write) { + char buffer[20]; + TMemoryOutput output(buffer, sizeof(buffer)); + output << "1" + << "22" + << "333" + << "4444" + << "55555"; + + const char* const result = "1" + "22" + "333" + "4444" + "55555"; + UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); + } + + Y_UNIT_TEST(WriteChars) { + char buffer[20]; + TMemoryOutput output(buffer, sizeof(buffer)); + output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0'; + + const char* const result = "1234567890"; + UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); + } +} diff --git a/util/stream/multi.cpp b/util/stream/multi.cpp new file mode 100644 index 0000000000..b2354298a0 --- /dev/null +++ b/util/stream/multi.cpp @@ -0,0 +1,56 @@ +#include "null.h" +#include "multi.h" + +TMultiInput::TMultiInput(IInputStream* f, IInputStream* s) noexcept + : C_(f) + , N_(s) +{ +} + +TMultiInput::~TMultiInput() = default; + +size_t TMultiInput::DoRead(void* buf, size_t len) { + const size_t ret = C_->Read(buf, len); + + if (ret) { + return ret; + } + + C_ = N_; + N_ = &Cnull; + + return C_->Read(buf, len); +} + +size_t TMultiInput::DoReadTo(TString& st, char ch) { + size_t ret = C_->ReadTo(st, ch); + if (ret == st.size() + 1) { // found a symbol, not eof + return ret; + } + + C_ = N_; + N_ = &Cnull; + + if (ret == 0) { + ret += C_->ReadTo(st, ch); + } else { + TString tmp; + ret += C_->ReadTo(tmp, ch); + st += tmp; + } + + return ret; +} + +size_t TMultiInput::DoSkip(size_t len) { + const size_t ret = C_->Skip(len); + + if (ret) { + return ret; + } + + C_ = N_; + N_ = &Cnull; + + return C_->Skip(len); +} diff --git a/util/stream/multi.h b/util/stream/multi.h new file mode 100644 index 0000000000..8bfd462d99 --- /dev/null +++ b/util/stream/multi.h @@ -0,0 +1,32 @@ +#pragma once + +#include "input.h" + +/** + * @addtogroup Streams_Multi + * @{ + */ + +/** + * A proxy input stream that concatenates two slave streams into one. + */ +class TMultiInput: public IInputStream { +public: + TMultiInput(IInputStream* f, IInputStream* s) noexcept; + ~TMultiInput() override; + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + size_t DoReadTo(TString& st, char ch) override; + +private: + IInputStream* C_; + IInputStream* N_; +}; + +/** + * See also "util/stream/tee.h" for multi output. + */ + +/** @} */ diff --git a/util/stream/multi_ut.cpp b/util/stream/multi_ut.cpp new file mode 100644 index 0000000000..fc2553b533 --- /dev/null +++ b/util/stream/multi_ut.cpp @@ -0,0 +1,51 @@ +#include "mem.h" +#include "multi.h" +#include "str.h" +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TestMultiInput) { + struct TTestCase { + TMemoryInput Input1; + TMemoryInput Input2; + TMultiInput MultiInput; + TTestCase(const TStringBuf in1, const TStringBuf in2) + : Input1(in1) + , Input2(in2) + , MultiInput(&Input1, &Input2) + { + } + void TestReadToResult(char c, size_t expectedRetval, + const TString& expectedValue, + const TString& initValue = "") { + TString t = initValue; + UNIT_ASSERT_VALUES_EQUAL(MultiInput.ReadTo(t, c), expectedRetval); + UNIT_ASSERT_VALUES_EQUAL(t, expectedValue); + } + }; + + Y_UNIT_TEST(TestReadTo) { + TString t; + + TTestCase simpleCase("0123456789abc", "defghijk"); + simpleCase.TestReadToResult('7', 8, "0123456"); + simpleCase.TestReadToResult('f', 8, "89abcde"); + simpleCase.TestReadToResult('z', 5, "ghijk"); + } + + Y_UNIT_TEST(TestReadToBetweenStreams) { + TTestCase case1("0123456789abc", "defghijk"); + case1.TestReadToResult('c', 13, "0123456789ab"); + case1.TestReadToResult('k', 8, "defghij"); + case1.TestReadToResult('z', 0, "TRASH", "TRASH"); + + TTestCase case2("0123456789abc", "defghijk"); + case2.TestReadToResult('d', 14, "0123456789abc"); + case2.TestReadToResult('j', 6, "efghi"); + case2.TestReadToResult('k', 1, "", "TRASH"); + + TTestCase case3("0123456789abc", "defghijk"); + case3.TestReadToResult('e', 15, "0123456789abcd"); + case3.TestReadToResult('j', 5, "fghi"); + case3.TestReadToResult('k', 1, "", "TRASH"); + } +} diff --git a/util/stream/null.cpp b/util/stream/null.cpp new file mode 100644 index 0000000000..4e8b298145 --- /dev/null +++ b/util/stream/null.cpp @@ -0,0 +1,36 @@ +#include "null.h" + +#include <util/generic/singleton.h> + +TNullIO& NPrivate::StdNullStream() noexcept { + return *SingletonWithPriority<TNullIO, 4>(); +} + +TNullInput::TNullInput() noexcept { +} + +TNullInput::~TNullInput() = default; + +size_t TNullInput::DoRead(void*, size_t) { + return 0; +} + +size_t TNullInput::DoSkip(size_t) { + return 0; +} + +size_t TNullInput::DoNext(const void**, size_t) { + return 0; +} + +TNullOutput::TNullOutput() noexcept = default; + +TNullOutput::~TNullOutput() = default; + +void TNullOutput::DoWrite(const void* /*buf*/, size_t /*len*/) { +} + +TNullIO::TNullIO() noexcept { +} + +TNullIO::~TNullIO() = default; diff --git a/util/stream/null.h b/util/stream/null.h new file mode 100644 index 0000000000..8c335a9a78 --- /dev/null +++ b/util/stream/null.h @@ -0,0 +1,61 @@ +#pragma once + +#include "zerocopy.h" +#include "output.h" + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Null input stream. Does nothing, contains no data. + */ +class TNullInput: public IZeroCopyInput { +public: + TNullInput() noexcept; + ~TNullInput() override; + +private: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + size_t DoNext(const void** ptr, size_t len) override; +}; + +/** + * Null output stream. Just ignores whatever is written into it. + */ +class TNullOutput: public IOutputStream { +public: + TNullOutput() noexcept; + ~TNullOutput() override; + + TNullOutput(TNullOutput&&) noexcept = default; + TNullOutput& operator=(TNullOutput&&) noexcept = default; + +private: + void DoWrite(const void* buf, size_t len) override; +}; + +/** + * Null input-output stream. + * + * @see TNullInput + * @see TNullOutput + */ +class TNullIO: public TNullInput, public TNullOutput { +public: + TNullIO() noexcept; + ~TNullIO() override; +}; + +namespace NPrivate { + TNullIO& StdNullStream() noexcept; +} + +/** + * Standard null stream. + */ +#define Cnull (::NPrivate::StdNullStream()) + +/** @} */ diff --git a/util/stream/output.cpp b/util/stream/output.cpp new file mode 100644 index 0000000000..db81b81b70 --- /dev/null +++ b/util/stream/output.cpp @@ -0,0 +1,428 @@ +#include "output.h" + +#include <util/string/cast.h> +#include "format.h" +#include <util/memory/tempbuf.h> +#include <util/generic/singleton.h> +#include <util/generic/yexception.h> +#include <util/charset/utf8.h> +#include <util/charset/wide.h> + +#if defined(_android_) + #include <util/system/dynlib.h> + #include <util/system/guard.h> + #include <util/system/mutex.h> + #include <android/log.h> +#endif + +#include <cerrno> +#include <string> +#include <string_view> +#include <cstdio> + +#if defined(_win_) + #include <io.h> +#endif + +constexpr size_t MAX_UTF8_BYTES = 4; // UTF-8-encoded code point takes between 1 and 4 bytes + +IOutputStream::IOutputStream() noexcept = default; + +IOutputStream::~IOutputStream() = default; + +void IOutputStream::DoFlush() { + /* + * do nothing + */ +} + +void IOutputStream::DoFinish() { + Flush(); +} + +void IOutputStream::DoWriteV(const TPart* parts, size_t count) { + for (size_t i = 0; i < count; ++i) { + const TPart& part = parts[i]; + + DoWrite(part.buf, part.len); + } +} + +void IOutputStream::DoWriteC(char ch) { + DoWrite(&ch, 1); +} + +template <> +void Out<wchar16>(IOutputStream& o, wchar16 ch) { + const wchar32 w32ch = ReadSymbol(&ch, &ch + 1); + size_t length; + unsigned char buffer[MAX_UTF8_BYTES]; + WriteUTF8Char(w32ch, length, buffer); + o.Write(buffer, length); +} + +template <> +void Out<wchar32>(IOutputStream& o, wchar32 ch) { + size_t length; + unsigned char buffer[MAX_UTF8_BYTES]; + WriteUTF8Char(ch, length, buffer); + o.Write(buffer, length); +} + +static void WriteString(IOutputStream& o, const wchar16* w, size_t n) { + const size_t buflen = (n * MAX_UTF8_BYTES); // * 4 because the conversion functions can convert unicode character into maximum 4 bytes of UTF8 + TTempBuf buffer(buflen + 1); + char* const data = buffer.Data(); + size_t written = 0; + WideToUTF8(w, n, data, written); + data[written] = 0; + o.Write(data, written); +} + +static void WriteString(IOutputStream& o, const wchar32* w, size_t n) { + const size_t buflen = (n * MAX_UTF8_BYTES); // * 4 because the conversion functions can convert unicode character into maximum 4 bytes of UTF8 + TTempBuf buffer(buflen + 1); + char* const data = buffer.Data(); + size_t written = 0; + WideToUTF8(w, n, data, written); + data[written] = 0; + o.Write(data, written); +} + +template <> +void Out<TString>(IOutputStream& o, const TString& p) { + o.Write(p.data(), p.size()); +} + +template <> +void Out<std::string>(IOutputStream& o, const std::string& p) { + o.Write(p.data(), p.length()); +} + +template <> +void Out<std::string_view>(IOutputStream& o, const std::string_view& p) { + o.Write(p.data(), p.length()); +} + +template <> +void Out<std::u16string_view>(IOutputStream& o, const std::u16string_view& p) { + WriteString(o, p.data(), p.length()); +} + +template <> +void Out<std::u32string_view>(IOutputStream& o, const std::u32string_view& p) { + WriteString(o, p.data(), p.length()); +} + +template <> +void Out<TStringBuf>(IOutputStream& o, const TStringBuf& p) { + o.Write(p.data(), p.length()); +} + +template <> +void Out<TWtringBuf>(IOutputStream& o, const TWtringBuf& p) { + WriteString(o, p.data(), p.length()); +} + +template <> +void Out<TUtf32StringBuf>(IOutputStream& o, const TUtf32StringBuf& p) { + WriteString(o, p.data(), p.length()); +} + +template <> +void Out<const wchar16*>(IOutputStream& o, const wchar16* w) { + if (w) { + WriteString(o, w, std::char_traits<wchar16>::length(w)); + } else { + o.Write("(null)"); + } +} + +template <> +void Out<const wchar32*>(IOutputStream& o, const wchar32* w) { + if (w) { + WriteString(o, w, std::char_traits<wchar32>::length(w)); + } else { + o.Write("(null)"); + } +} + +template <> +void Out<TUtf16String>(IOutputStream& o, const TUtf16String& w) { + WriteString(o, w.c_str(), w.size()); +} + +template <> +void Out<TUtf32String>(IOutputStream& o, const TUtf32String& w) { + WriteString(o, w.c_str(), w.size()); +} + +#define DEF_CONV_DEFAULT(type) \ + template <> \ + void Out<type>(IOutputStream & o, type p) { \ + o << ToString(p); \ + } + +#define DEF_CONV_CHR(type) \ + template <> \ + void Out<type>(IOutputStream & o, type p) { \ + o.Write((char)p); \ + } + +#define DEF_CONV_NUM(type, len) \ + template <> \ + void Out<type>(IOutputStream & o, type p) { \ + char buf[len]; \ + o.Write(buf, ToString(p, buf, sizeof(buf))); \ + } \ + \ + template <> \ + void Out<volatile type>(IOutputStream & o, volatile type p) { \ + Out<type>(o, p); \ + } + +DEF_CONV_NUM(bool, 64) + +DEF_CONV_CHR(char) +DEF_CONV_CHR(signed char) +DEF_CONV_CHR(unsigned char) + +DEF_CONV_NUM(signed short, 64) +DEF_CONV_NUM(signed int, 64) +DEF_CONV_NUM(signed long int, 64) +DEF_CONV_NUM(signed long long int, 64) + +DEF_CONV_NUM(unsigned short, 64) +DEF_CONV_NUM(unsigned int, 64) +DEF_CONV_NUM(unsigned long int, 64) +DEF_CONV_NUM(unsigned long long int, 64) + +DEF_CONV_NUM(float, 512) +DEF_CONV_NUM(double, 512) +DEF_CONV_NUM(long double, 512) + +#if !defined(_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION) || (_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION == 1) +// TODO: acknowledge std::bitset::reference for both libc++ and libstdc++ +template <> +void Out<typename std::vector<bool>::reference>(IOutputStream& o, const std::vector<bool>::reference& bit) { + return Out<bool>(o, static_cast<bool>(bit)); +} +#endif + +#ifndef TSTRING_IS_STD_STRING +template <> +void Out<TBasicCharRef<TString>>(IOutputStream& o, const TBasicCharRef<TString>& c) { + o << static_cast<char>(c); +} + +template <> +void Out<TBasicCharRef<TUtf16String>>(IOutputStream& o, const TBasicCharRef<TUtf16String>& c) { + o << static_cast<wchar16>(c); +} + +template <> +void Out<TBasicCharRef<TUtf32String>>(IOutputStream& o, const TBasicCharRef<TUtf32String>& c) { + o << static_cast<wchar32>(c); +} +#endif + +template <> +void Out<const void*>(IOutputStream& o, const void* t) { + o << Hex(size_t(t)); +} + +template <> +void Out<void*>(IOutputStream& o, void* t) { + Out<const void*>(o, t); +} + +using TNullPtr = decltype(nullptr); + +template <> +void Out<TNullPtr>(IOutputStream& o, TTypeTraits<TNullPtr>::TFuncParam) { + o << TStringBuf("nullptr"); +} + +#if defined(_android_) +namespace { + class TAndroidStdIOStreams { + public: + TAndroidStdIOStreams() + : LogLibrary("liblog.so") + , LogFuncPtr((TLogFuncPtr)LogLibrary.Sym("__android_log_write")) + , Out(LogFuncPtr) + , Err(LogFuncPtr) + { + } + + public: + using TLogFuncPtr = void (*)(int, const char*, const char*); + + class TAndroidStdOutput: public IOutputStream { + public: + inline TAndroidStdOutput(TLogFuncPtr logFuncPtr) noexcept + : Buffer() + , LogFuncPtr(logFuncPtr) + { + } + + virtual ~TAndroidStdOutput() { + } + + private: + virtual void DoWrite(const void* buf, size_t len) override { + with_lock (BufferMutex) { + Buffer.Write(buf, len); + } + } + + virtual void DoFlush() override { + with_lock (BufferMutex) { + LogFuncPtr(ANDROID_LOG_DEBUG, GetTag(), Buffer.Data()); + Buffer.Clear(); + } + } + + virtual const char* GetTag() const = 0; + + private: + TMutex BufferMutex; + TStringStream Buffer; + TLogFuncPtr LogFuncPtr; + }; + + class TStdErr: public TAndroidStdOutput { + public: + TStdErr(TLogFuncPtr logFuncPtr) + : TAndroidStdOutput(logFuncPtr) + { + } + + virtual ~TStdErr() { + } + + private: + virtual const char* GetTag() const override { + return "stderr"; + } + }; + + class TStdOut: public TAndroidStdOutput { + public: + TStdOut(TLogFuncPtr logFuncPtr) + : TAndroidStdOutput(logFuncPtr) + { + } + + virtual ~TStdOut() { + } + + private: + virtual const char* GetTag() const override { + return "stdout"; + } + }; + + static bool Enabled; + TDynamicLibrary LogLibrary; // field order is important, see constructor + TLogFuncPtr LogFuncPtr; + TStdOut Out; + TStdErr Err; + + static inline TAndroidStdIOStreams& Instance() { + return *SingletonWithPriority<TAndroidStdIOStreams, 4>(); + } + }; + + bool TAndroidStdIOStreams::Enabled = false; +} +#endif // _android_ + +namespace { + class TStdOutput: public IOutputStream { + public: + inline TStdOutput(FILE* f) noexcept + : F_(f) + { + } + + ~TStdOutput() override = default; + + private: + void DoWrite(const void* buf, size_t len) override { + if (len != fwrite(buf, 1, len, F_)) { +#if defined(_win_) + // On Windows, if 'F_' is console -- 'fwrite' returns count of written characters. + // If, for example, console output codepage is UTF-8, then returned value is + // not equal to 'len'. So, we ignore some 'errno' values... + if ((errno == 0 || errno == EINVAL || errno == EILSEQ) && _isatty(fileno(F_))) { + return; + } +#endif + ythrow TSystemError() << "write failed"; + } + } + + void DoFlush() override { + if (fflush(F_) != 0) { + ythrow TSystemError() << "fflush failed"; + } + } + + private: + FILE* F_; + }; + + struct TStdIOStreams { + struct TStdErr: public TStdOutput { + inline TStdErr() + : TStdOutput(stderr) + { + } + + ~TStdErr() override = default; + }; + + struct TStdOut: public TStdOutput { + inline TStdOut() + : TStdOutput(stdout) + { + } + + ~TStdOut() override = default; + }; + + TStdOut Out; + TStdErr Err; + + static inline TStdIOStreams& Instance() { + return *SingletonWithPriority<TStdIOStreams, 4>(); + } + }; +} + +IOutputStream& NPrivate::StdErrStream() noexcept { +#if defined(_android_) + if (TAndroidStdIOStreams::Enabled) { + return TAndroidStdIOStreams::Instance().Err; + } +#endif + return TStdIOStreams::Instance().Err; +} + +IOutputStream& NPrivate::StdOutStream() noexcept { +#if defined(_android_) + if (TAndroidStdIOStreams::Enabled) { + return TAndroidStdIOStreams::Instance().Out; + } +#endif + return TStdIOStreams::Instance().Out; +} + +void RedirectStdioToAndroidLog(bool redirect) { +#if defined(_android_) + TAndroidStdIOStreams::Enabled = redirect; +#else + Y_UNUSED(redirect); +#endif +} diff --git a/util/stream/output.h b/util/stream/output.h new file mode 100644 index 0000000000..00eef50b95 --- /dev/null +++ b/util/stream/output.h @@ -0,0 +1,304 @@ +#pragma once + +#include "fwd.h" +#include "labeled.h" + +#include <util/generic/noncopyable.h> +#include <util/generic/string.h> +#include <util/generic/strbuf.h> +#include <util/generic/typetraits.h> + +#include <type_traits> + +/** + * @addtogroup Streams_Base + * @{ + */ + +/** + * Abstract output stream. + */ +class IOutputStream: public TNonCopyable { +public: + /** + * Data block for output. + */ + struct TPart { + inline TPart(const void* Buf, size_t Len) noexcept + : buf(Buf) + , len(Len) + { + } + + inline TPart(const TStringBuf s) noexcept + : buf(s.data()) + , len(s.size()) + { + } + + inline TPart() noexcept + : buf(nullptr) + , len(0) + { + } + + inline ~TPart() = default; + + static inline TPart CrLf() noexcept { + return TPart("\r\n", 2); + } + + const void* buf; + size_t len; + }; + + IOutputStream() noexcept; + virtual ~IOutputStream(); + + IOutputStream(IOutputStream&&) noexcept { + } + + IOutputStream& operator=(IOutputStream&&) noexcept { + return *this; + }; + + /** + * Writes into this stream. + * + * @param buf Data to write. + * @param len Number of bytes to write. + */ + inline void Write(const void* buf, size_t len) { + if (len) { + DoWrite(buf, len); + } + } + + /** + * Writes a string into this stream. + * + * @param st String to write. + */ + inline void Write(const TStringBuf st) { + Write(st.data(), st.size()); + } + + /** + * Writes several data blocks into this stream. + * + * @param parts Pointer to the start of the data blocks + * array. + * @param count Number of data blocks to write. + */ + inline void Write(const TPart* parts, size_t count) { + if (count > 1) { + DoWriteV(parts, count); + } else if (count) { + DoWrite(parts->buf, parts->len); + } + } + + /** + * Writes a single character into this stream. + * + * @param ch Character to write. + */ + inline void Write(char ch) { + DoWriteC(ch); + } + + /** + * Flushes this stream's buffer, if any. + * + * Note that this can also be done with a `Flush` manipulator: + * @code + * stream << "some string" << Flush; + * @endcode + */ + inline void Flush() { + DoFlush(); + } + + /** + * Flushes and closes this stream. No more data can be written into a stream + * once it's closed. + */ + inline void Finish() { + DoFinish(); + } + +protected: + /** + * Writes into this stream. + * + * @param buf Data to write. + * @param len Number of bytes to write. + * @throws yexception If IO error occurs. + */ + virtual void DoWrite(const void* buf, size_t len) = 0; + + /** + * Writes several data blocks into this stream. + * + * @param parts Pointer to the start of the data blocks + * array. + * @param count Number of data blocks to write. + * @throws yexception If IO error occurs. + */ + virtual void DoWriteV(const TPart* parts, size_t count); + + /** + * Writes a single character into this stream. Can be overridden with a faster implementation. + * + * @param ch Character to write. + */ + virtual void DoWriteC(char ch); + + /** + * Flushes this stream's buffer, if any. + * + * @throws yexception If IO error occurs. + */ + virtual void DoFlush(); + + /** + * Flushes and closes this stream. No more data can be written into a stream + * once it's closed. + * + * @throws yexception If IO error occurs. + */ + virtual void DoFinish(); +}; + +/** + * `operator<<` for `IOutputStream` by default delegates to this function. + * + * Note that while `operator<<` uses overloading (and thus argument-dependent + * lookup), `Out` uses template specializations. This makes it possible to + * have a single `Out` declaration, and then just provide specializations in + * cpp files, letting the linker figure everything else out. This approach + * reduces compilation times. + * + * However, if the flexibility of overload resolution is needed, then one should + * just overload `operator<<`. + * + * @param out Output stream to write into. + * @param value Value to write. + */ +template <class T> +void Out(IOutputStream& out, typename TTypeTraits<T>::TFuncParam value); + +#define Y_DECLARE_OUT_SPEC(MODIF, T, stream, value) \ + template <> \ + MODIF void Out<T>(IOutputStream & stream, TTypeTraits<T>::TFuncParam value) + +template <> +inline void Out<const char*>(IOutputStream& o, const char* t) { + if (t) { + o.Write(t); + } else { + o.Write("(null)"); + } +} + +template <> +void Out<const wchar16*>(IOutputStream& o, const wchar16* w); + +template <> +void Out<const wchar32*>(IOutputStream& o, const wchar32* w); + +static inline IOutputStream& operator<<(IOutputStream& o, TStreamManipulator m) { + m(o); + + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, const char* t) { + Out<const char*>(o, t); + + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, char* t) { + Out<const char*>(o, t); + + return o; +} + +template <class T> +static inline std::enable_if_t<std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, T t) { + Out<T>(o, t); + + return o; +} + +template <class T> +static inline std::enable_if_t<!std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, const T& t) { + Out<T>(o, t); + + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, const wchar16* t) { + Out<const wchar16*>(o, t); + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, wchar16* t) { + Out<const wchar16*>(o, t); + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, const wchar32* t) { + Out<const wchar32*>(o, t); + return o; +} + +static inline IOutputStream& operator<<(IOutputStream& o, wchar32* t) { + Out<const wchar32*>(o, t); + return o; +} + +namespace NPrivate { + IOutputStream& StdOutStream() noexcept; + IOutputStream& StdErrStream() noexcept; +} + +/** + * Standard output stream. + */ +#define Cout (::NPrivate::StdOutStream()) + +/** + * Standard error stream. + */ +#define Cerr (::NPrivate::StdErrStream()) + +/** + * Standard log stream. + */ +#define Clog Cerr + +/** + * End-of-line output manipulator, basically the same as `std::endl`. + */ +static inline void Endl(IOutputStream& o) { + (o << '\n').Flush(); +} + +/** + * Flushing stream manipulator, basically the same as `std::flush`. + */ +static inline void Flush(IOutputStream& o) { + o.Flush(); +} + +/* + * Also see format.h for additional manipulators. + */ + +#include "debug.h" + +void RedirectStdioToAndroidLog(bool redirect); + +/** @} */ diff --git a/util/stream/output.pxd b/util/stream/output.pxd new file mode 100644 index 0000000000..2fccc26d9b --- /dev/null +++ b/util/stream/output.pxd @@ -0,0 +1,12 @@ +from util.generic.string cimport TStringBuf + + +cdef extern from "<util/stream/output.h>" nogil: + cdef cppclass IOutputStream: + IOutputStream() + void Flush() except+ + void Finish() except+ + + void WriteChar "Write"(char) except+ + void WriteBuf "Write"(const TStringBuf) except+ + void Write(const void*, size_t) except+ diff --git a/util/stream/pipe.cpp b/util/stream/pipe.cpp new file mode 100644 index 0000000000..51be1934a7 --- /dev/null +++ b/util/stream/pipe.cpp @@ -0,0 +1,121 @@ +#include "pipe.h" + +#include <util/generic/yexception.h> + +#include <cstdio> +#include <cerrno> + +class TPipeBase::TImpl { +public: + inline TImpl(const TString& command, const char* mode) + : Pipe_(nullptr) + { +#ifndef _freebsd_ + if (strcmp(mode, "r+") == 0) { + ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD"; + } +#endif + Pipe_ = ::popen(command.data(), mode); + if (Pipe_ == nullptr) { + ythrow TSystemError() << "failed to open pipe: " << command.Quote(); + } + } + + inline ~TImpl() { + if (Pipe_ != nullptr) { + ::pclose(Pipe_); + } + } + +public: + FILE* Pipe_; +}; + +TPipeBase::TPipeBase(const TString& command, const char* mode) + : Impl_(new TImpl(command, mode)) +{ +} + +TPipeBase::~TPipeBase() = default; + +TPipeInput::TPipeInput(const TString& command) + : TPipeBase(command, "r") +{ +} + +size_t TPipeInput::DoRead(void* buf, size_t len) { + if (Impl_->Pipe_ == nullptr) { + return 0; + } + + size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_); + if (bytesRead == 0) { + int exitStatus = ::pclose(Impl_->Pipe_); + Impl_->Pipe_ = nullptr; + if (exitStatus == -1) { + ythrow TSystemError() << "pclose() failed"; + } else if (exitStatus != 0) { + ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; + } + } + return bytesRead; +} + +TPipeOutput::TPipeOutput(const TString& command) + : TPipeBase(command, "w") +{ +} + +void TPipeOutput::DoWrite(const void* buf, size_t len) { + if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) { + ythrow TSystemError() << "fwrite failed"; + } +} + +void TPipeOutput::Close() { + int exitStatus = ::pclose(Impl_->Pipe_); + Impl_->Pipe_ = nullptr; + if (exitStatus == -1) { + ythrow TSystemError() << "pclose() failed"; + } else if (exitStatus != 0) { + ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; + } +} + +TPipedBase::TPipedBase(PIPEHANDLE fd) + : Handle_(fd) +{ +} + +TPipedBase::~TPipedBase() { + if (Handle_.IsOpen()) { + Handle_.Close(); + } +} + +TPipedInput::TPipedInput(PIPEHANDLE fd) + : TPipedBase(fd) +{ +} + +TPipedInput::~TPipedInput() = default; + +size_t TPipedInput::DoRead(void* buf, size_t len) { + if (!Handle_.IsOpen()) { + return 0; + } + return Handle_.Read(buf, len); +} + +TPipedOutput::TPipedOutput(PIPEHANDLE fd) + : TPipedBase(fd) +{ +} + +TPipedOutput::~TPipedOutput() = default; + +void TPipedOutput::DoWrite(const void* buf, size_t len) { + if (!Handle_.IsOpen() || static_cast<ssize_t>(len) != Handle_.Write(buf, len)) { + ythrow TSystemError() << "pipe writing failed"; + } +} diff --git a/util/stream/pipe.h b/util/stream/pipe.h new file mode 100644 index 0000000000..18525b9517 --- /dev/null +++ b/util/stream/pipe.h @@ -0,0 +1,112 @@ +#pragma once + +#include "input.h" +#include "output.h" + +#include <util/system/pipe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> + +/** + * @addtogroup Streams_Pipes + * @{ + */ + +/** + * Base class for starting a process and communicating with it via pipes. + */ +class TPipeBase { +protected: + /** + * Starts a new process and opens a pipe. + * + * @param command Command line to start a process with. + * @param mode Data transfer mode for the pipe. Use + * "r" for reading and "w" for writing. + */ + TPipeBase(const TString& command, const char* mode); + virtual ~TPipeBase(); + +protected: + class TImpl; + THolder<TImpl> Impl_; +}; + +/** + * Input stream that binds to a standard output stream of a newly started process. + * + * Note that if the process ends with non-zero exit status, `Read` function will + * throw an exception. + */ +class TPipeInput: protected TPipeBase, public IInputStream { +public: + /** + * Starts a new process and opens a pipe. + * + * @param command Command line to start a process with. + */ + TPipeInput(const TString& command); + +private: + size_t DoRead(void* buf, size_t len) override; +}; + +/** + * Output stream that binds to a standard input stream of a newly started process. + * + * Note that if the process ends with non-zero exit status, `Close` function will + * throw an exception. + */ +class TPipeOutput: protected TPipeBase, public IOutputStream { +public: + /** + * Starts a new process and opens a pipe. + * + * @param command Command line to start a process with. + */ + TPipeOutput(const TString& command); + + /** + * Waits for the process to terminate and throws an exception if it ended + * with a non-zero exit status. + */ + void Close(); + +private: + void DoWrite(const void* buf, size_t len) override; +}; + +class TPipedBase { +protected: + TPipedBase(PIPEHANDLE fd); + virtual ~TPipedBase(); + +protected: + TPipeHandle Handle_; +}; + +/** + * Input stream that binds to a standard output stream of an existing process. + */ +class TPipedInput: public TPipedBase, public IInputStream { +public: + TPipedInput(PIPEHANDLE fd); + ~TPipedInput() override; + +private: + size_t DoRead(void* buf, size_t len) override; +}; + +/** + * Output stream that binds to a standard input stream of an existing process. + */ +class TPipedOutput: public TPipedBase, public IOutputStream { +public: + TPipedOutput(PIPEHANDLE fd); + ~TPipedOutput() override; + +private: + void DoWrite(const void* buf, size_t len) override; +}; + +/** @} */ diff --git a/util/stream/printf.cpp b/util/stream/printf.cpp new file mode 100644 index 0000000000..f3eeca7afc --- /dev/null +++ b/util/stream/printf.cpp @@ -0,0 +1,51 @@ +#include "output.h" +#include "printf.h" + +#include <util/generic/scope.h> +#include <util/memory/tempbuf.h> +#include <util/generic/yexception.h> + +size_t Printf(IOutputStream& out, const char* fmt, ...) { + va_list lst; + va_start(lst, fmt); + + Y_DEFER { + va_end(lst); + }; + + return Printf(out, fmt, lst); +} + +static inline size_t TryPrintf(void* ptr, size_t len, IOutputStream& out, const char* fmt, va_list params) { + va_list lst; + va_copy(lst, params); + const int ret = vsnprintf((char*)ptr, len, fmt, lst); + va_end(lst); + + if (ret < 0) { + return len; + } + + if ((size_t)ret < len) { + out.Write(ptr, (size_t)ret); + } + + return (size_t)ret; +} + +size_t Printf(IOutputStream& out, const char* fmt, va_list params) { + size_t guess = 0; + + while (true) { + TTempBuf tmp(guess); + const size_t ret = TryPrintf(tmp.Data(), tmp.Size(), out, fmt, params); + + if (ret < tmp.Size()) { + return ret; + } + + guess = Max(tmp.Size() * 2, ret + 1); + } + + return 0; +} diff --git a/util/stream/printf.h b/util/stream/printf.h new file mode 100644 index 0000000000..1c7ddc0664 --- /dev/null +++ b/util/stream/printf.h @@ -0,0 +1,25 @@ +#pragma once + +#include <util/system/compat.h> + +class IOutputStream; + +/** + * Stream-based `printf` function. Prints formatted data into the provided stream. + * Works the same way as a standard C `printf`. + * + * @param out Stream to write into. + * @param fmt Format string. + * @param ... Additional arguments. + */ +size_t Y_PRINTF_FORMAT(2, 3) Printf(IOutputStream& out, const char* fmt, ...); + +/** + * Stream-based `vprintf` function. Prints formatted data from variable argument + * list into the provided stream. Works the same way as a standard C `vprintf`. + * + * @param out Stream to write into. + * @param fmt Format string. + * @param params Additional arguments as a variable argument list. + */ +size_t Y_PRINTF_FORMAT(2, 0) Printf(IOutputStream& out, const char* fmt, va_list params); diff --git a/util/stream/printf_ut.cpp b/util/stream/printf_ut.cpp new file mode 100644 index 0000000000..0eab167062 --- /dev/null +++ b/util/stream/printf_ut.cpp @@ -0,0 +1,33 @@ +#include "null.h" +#include "printf.h" +#include "str.h" + +#include <util/generic/string.h> + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TStreamPrintfTest) { + Y_UNIT_TEST(TestPrintf) { + TStringStream ss; + + UNIT_ASSERT_EQUAL(Printf(ss, "qw %s %d", "er", 1), 7); + UNIT_ASSERT_EQUAL(ss.Str(), "qw er 1"); + } + +#ifdef __GNUC__ + #pragma GCC diagnostic ignored "-Wformat-zero-length" +#endif // __GNUC__ + + Y_UNIT_TEST(TestZeroString) { + UNIT_ASSERT_EQUAL(Printf(Cnull, ""), 0); + } + + Y_UNIT_TEST(TestLargePrintf) { + TString s = NUnitTest::RandomString(1000000); + TStringStream ss; + + Printf(ss, "%s", s.data()); + + UNIT_ASSERT_EQUAL(ss.Str(), s); + } +} diff --git a/util/stream/str.cpp b/util/stream/str.cpp new file mode 100644 index 0000000000..13f0e8ef28 --- /dev/null +++ b/util/stream/str.cpp @@ -0,0 +1,44 @@ +#include "str.h" + +static constexpr size_t MIN_BUFFER_GROW_SIZE = 16; + +TStringInput::~TStringInput() = default; + +size_t TStringInput::DoNext(const void** ptr, size_t len) { + len = Min(len, S_->size() - Pos_); + *ptr = S_->data() + Pos_; + Pos_ += len; + return len; +} + +void TStringInput::DoUndo(size_t len) { + Y_VERIFY(len <= Pos_); + Pos_ -= len; +} + +TStringOutput::~TStringOutput() = default; + +size_t TStringOutput::DoNext(void** ptr) { + if (S_->size() == S_->capacity()) { + S_->reserve(FastClp2(S_->capacity() + MIN_BUFFER_GROW_SIZE)); + } + size_t previousSize = S_->size(); + ResizeUninitialized(*S_, S_->capacity()); + *ptr = S_->begin() + previousSize; + return S_->size() - previousSize; +} + +void TStringOutput::DoUndo(size_t len) { + Y_VERIFY(len <= S_->size(), "trying to undo more bytes than actually written"); + S_->resize(S_->size() - len); +} + +void TStringOutput::DoWrite(const void* buf, size_t len) { + S_->append((const char*)buf, len); +} + +void TStringOutput::DoWriteC(char c) { + S_->push_back(c); +} + +TStringStream::~TStringStream() = default; diff --git a/util/stream/str.h b/util/stream/str.h new file mode 100644 index 0000000000..028bd572c0 --- /dev/null +++ b/util/stream/str.h @@ -0,0 +1,215 @@ +#pragma once + +#include "zerocopy.h" +#include "zerocopy_output.h" + +#include <util/generic/string.h> +#include <util/generic/noncopyable.h> +#include <util/generic/store_policy.h> + +/** + * @addtogroup Streams_Strings + * @{ + */ + +/** + * Input stream for reading data from a string. + */ +class TStringInput: public IZeroCopyInputFastReadTo { +public: + /** + * Constructs a string input stream that reads character data from the + * provided string. + * + * Note that this stream keeps a reference to the provided string, so it's + * up to the user to make sure that the string doesn't get destroyed while + * this stream is in use. + * + * For reading data from `TStringBuf`s, see `TMemoryInput` (`util/stream/mem.h`). + * + * @param s String to read from. + */ + inline TStringInput(const TString& s) noexcept + : S_(&s) + , Pos_(0) + { + } + + TStringInput(const TString&&) = delete; + + ~TStringInput() override; + + TStringInput(TStringInput&&) noexcept = default; + TStringInput& operator=(TStringInput&&) noexcept = default; + + inline void Swap(TStringInput& s) noexcept { + DoSwap(S_, s.S_); + DoSwap(Pos_, s.Pos_); + } + +protected: + size_t DoNext(const void** ptr, size_t len) override; + void DoUndo(size_t len) override; + +private: + const TString* S_; + size_t Pos_; + + friend class TStringStream; +}; + +/** + * Stream for writing data into a string. + */ +class TStringOutput: public IZeroCopyOutput { +public: + /** + * Constructs a string output stream that appends character data to the + * provided string. + * + * Note that this stream keeps a reference to the provided string, so it's + * up to the user to make sure that the string doesn't get destroyed while + * this stream is in use. + * + * @param s String to append to. + */ + inline TStringOutput(TString& s) noexcept + : S_(&s) + { + } + + TStringOutput(TStringOutput&& s) noexcept = default; + + ~TStringOutput() override; + + /** + * @param size Number of additional characters to + * reserve in output string. + */ + inline void Reserve(size_t size) { + S_->reserve(S_->size() + size); + } + + inline void Swap(TStringOutput& s) noexcept { + DoSwap(S_, s.S_); + } + +protected: + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; + void DoWrite(const void* buf, size_t len) override; + void DoWriteC(char c) override; + +private: + TString* S_; +}; + +/** + * String input/output stream, similar to `std::stringstream`. + */ +class TStringStream: private TEmbedPolicy<TString>, public TStringInput, public TStringOutput { + using TEmbeddedString = TEmbedPolicy<TString>; + +public: + inline TStringStream() + : TEmbeddedString() + , TStringInput(*TEmbeddedString::Ptr()) + , TStringOutput(*TEmbeddedString::Ptr()) + { + } + + inline TStringStream(const TString& string) + : TEmbeddedString(string) + , TStringInput(*TEmbeddedString::Ptr()) + , TStringOutput(*TEmbeddedString::Ptr()) + { + } + + inline TStringStream(const TStringStream& other) + : TEmbeddedString(other.Str()) + , TStringInput(*TEmbeddedString::Ptr()) + , TStringOutput(*TEmbeddedString::Ptr()) + { + } + + inline TStringStream& operator=(const TStringStream& other) { + // All references remain alive, we need to change position only + Str() = other.Str(); + Pos_ = other.Pos_; + + return *this; + } + + ~TStringStream() override; + + /** + * @returns Whether @c this contains any data + */ + explicit operator bool() const noexcept { + return !Empty(); + } + + /** + * @returns String that this stream is writing into. + */ + inline TString& Str() noexcept { + return *Ptr(); + } + + /** + * @returns String that this stream is writing into. + */ + inline const TString& Str() const noexcept { + return *Ptr(); + } + + /** + * @returns Pointer to the character data contained + * in this stream. The data is guaranteed + * to be null-terminated. + */ + inline const char* Data() const noexcept { + return Ptr()->data(); + } + + /** + * @returns Total number of characters in this + * stream. Note that this is not the same + * as the total number of characters + * available for reading. + */ + inline size_t Size() const noexcept { + return Ptr()->size(); + } + + /** + * @returns Whether the string that this stream + * operates on is empty. + */ + Y_PURE_FUNCTION inline bool Empty() const noexcept { + return Str().empty(); + } + + using TStringOutput::Reserve; + + /** + * Clears the string that this stream operates on and resets the + * read/write pointers. + */ + inline void Clear() { + Str().clear(); + Pos_ = 0; + } + + // TODO: compatibility with existing code, remove + + Y_PURE_FUNCTION bool empty() const { + return Empty(); + } + + void clear() { + Clear(); + } +}; + +/** @} */ diff --git a/util/stream/str.pxd b/util/stream/str.pxd new file mode 100644 index 0000000000..76dc16a822 --- /dev/null +++ b/util/stream/str.pxd @@ -0,0 +1,12 @@ +from util.generic.ptr cimport THolder +from util.generic.string cimport TString, TStringBuf +from util.stream.output cimport IOutputStream + + +cdef extern from "<util/stream/str.h>" nogil: + cdef cppclass TStringOutput(IOutputStream): + TStringOutput() except+ + TStringOutput(TString&) except+ + void Reserve(size_t) except+ + +ctypedef THolder[TStringOutput] TStringOutputPtr diff --git a/util/stream/str_ut.cpp b/util/stream/str_ut.cpp new file mode 100644 index 0000000000..fc6b46c31a --- /dev/null +++ b/util/stream/str_ut.cpp @@ -0,0 +1,152 @@ +#include "str.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/generic/typetraits.h> + +template <typename T> +const T ReturnConstTemp(); + +Y_UNIT_TEST_SUITE(TStringInputOutputTest) { + Y_UNIT_TEST(Lvalue) { + TString str = "Hello, World!"; + TStringInput input(str); + + TString result = input.ReadAll(); + + UNIT_ASSERT_VALUES_EQUAL(result, str); + } + + Y_UNIT_TEST(ConstRef) { + TString str = "Hello, World!"; + const TString& r = str; + TStringInput input(r); + + TString result = input.ReadAll(); + + UNIT_ASSERT_VALUES_EQUAL(result, str); + } + + Y_UNIT_TEST(NonConstRef) { + TString str = "Hello, World!"; + TString& r = str; + TStringInput input(r); + + TString result = input.ReadAll(); + + UNIT_ASSERT_VALUES_EQUAL(result, str); + } + + Y_UNIT_TEST(Transfer) { + TString inputString = "some_string"; + TStringInput input(inputString); + + TString outputString; + TStringOutput output(outputString); + + TransferData(&input, &output); + + UNIT_ASSERT_VALUES_EQUAL(inputString, outputString); + } + + Y_UNIT_TEST(SkipReadAll) { + TString string0 = "All animals are equal, but some animals are more equal than others."; + + TString string1; + for (size_t i = 1; i <= string0.size(); i++) { + string1 += string0.substr(0, i); + } + + TStringInput input0(string1); + + size_t left = 5; + while (left > 0) { + left -= input0.Skip(left); + } + + TString string2 = input0.ReadAll(); + + UNIT_ASSERT_VALUES_EQUAL(string2, string1.substr(5)); + } + + Y_UNIT_TEST(OperatorBool) { + TStringStream str; + UNIT_ASSERT(!str); + str << "data"; + UNIT_ASSERT(str); + str.Clear(); + UNIT_ASSERT(!str); + } + + Y_UNIT_TEST(TestReadTo) { + TString s("0123456789abc"); + TString t; + + TStringInput in0(s); + UNIT_ASSERT_VALUES_EQUAL(in0.ReadTo(t, '7'), 8); + UNIT_ASSERT_VALUES_EQUAL(t, "0123456"); + UNIT_ASSERT_VALUES_EQUAL(in0.ReadTo(t, 'z'), 5); + UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); + } + + Y_UNIT_TEST(WriteViaNextAndUndo) { + TString str1; + TStringOutput output(str1); + TString str2; + + for (size_t i = 0; i < 10000; ++i) { + str2.push_back('a' + (i % 20)); + } + + size_t written = 0; + void* ptr = nullptr; + while (written < str2.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str2.size() - written); + memcpy(ptr, str2.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + + Y_UNIT_TEST(Write) { + TString str; + TStringOutput output(str); + output << "1" + << "22" + << "333" + << "4444" + << "55555"; + + UNIT_ASSERT_STRINGS_EQUAL(str, "1" + "22" + "333" + "4444" + "55555"); + } + + Y_UNIT_TEST(WriteChars) { + TString str; + TStringOutput output(str); + output << '1' << '2' << '3' << '4' << '5' << '6' << '7' << '8' << '9' << '0'; + + UNIT_ASSERT_STRINGS_EQUAL(str, "1234567890"); + } + + Y_UNIT_TEST(MoveConstructor) { + TString str; + TStringOutput output1(str); + output1 << "foo"; + + TStringOutput output2 = std::move(output1); + output2 << "bar"; + UNIT_ASSERT_STRINGS_EQUAL(str, "foobar"); + + // Check old stream is in a valid state + output1 << "baz"; + } +} diff --git a/util/stream/str_ut.pyx b/util/stream/str_ut.pyx new file mode 100644 index 0000000000..2ae617303f --- /dev/null +++ b/util/stream/str_ut.pyx @@ -0,0 +1,62 @@ +# cython: c_string_type=str, c_string_encoding=utf8 + +from cython.operator cimport dereference + +from util.generic.ptr cimport THolder +from util.generic.string cimport TString, TStringBuf +from util.stream.str cimport TStringOutput, TStringOutputPtr + +import unittest + + +class TestStringOutput(unittest.TestCase): + def test_ctor1(self): + cdef TStringOutput output + + def test_ctor2(self): + cdef TString string + cdef THolder[TStringOutput] string_output = THolder[TStringOutput](new TStringOutput(string)) + + def test_write_char(self): + cdef TString string + cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string)) + + self.assertEqual(string, "") + dereference(string_output.Get()).WriteChar('1') + self.assertEqual(string, "1") + dereference(string_output.Get()).WriteChar('2') + self.assertEqual(string, "12") + dereference(string_output.Get()).WriteChar('3') + self.assertEqual(string, "123") + + def test_write_void(self): + cdef TString string + cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string)) + + self.assertEqual(string, "") + dereference(string_output.Get()).Write("1", 1) + self.assertEqual(string, "1") + dereference(string_output.Get()).Write("2", 1) + self.assertEqual(string, "12") + dereference(string_output.Get()).Write("34", 2) + self.assertEqual(string, "1234") + + def test_write_buf(self): + cdef TString string + cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string)) + + self.assertEqual(string, "") + dereference(string_output.Get()).WriteBuf(TStringBuf("1")) + self.assertEqual(string, "1") + dereference(string_output.Get()).WriteBuf(TStringBuf("2")) + self.assertEqual(string, "12") + dereference(string_output.Get()).WriteBuf(TStringBuf("34")) + self.assertEqual(string, "1234") + + def test_reserve(self): + cdef TString string + cdef TStringOutputPtr string_output = TStringOutputPtr(new TStringOutput(string)) + self.assertEqual(string, "") + dereference(string_output.Get()).Reserve(50) + self.assertEqual(string, "") + self.assertLessEqual(50, string.capacity()) diff --git a/util/stream/tee.cpp b/util/stream/tee.cpp new file mode 100644 index 0000000000..99873b95ba --- /dev/null +++ b/util/stream/tee.cpp @@ -0,0 +1,24 @@ +#include "tee.h" + +TTeeOutput::TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept + : L_(l) + , R_(r) +{ +} + +TTeeOutput::~TTeeOutput() = default; + +void TTeeOutput::DoWrite(const void* buf, size_t len) { + L_->Write(buf, len); + R_->Write(buf, len); +} + +void TTeeOutput::DoFlush() { + L_->Flush(); + R_->Flush(); +} + +void TTeeOutput::DoFinish() { + L_->Finish(); + R_->Finish(); +} diff --git a/util/stream/tee.h b/util/stream/tee.h new file mode 100644 index 0000000000..c69e232fb9 --- /dev/null +++ b/util/stream/tee.h @@ -0,0 +1,28 @@ +#pragma once + +#include "output.h" + +/** + * @addtogroup Streams_Multi + * @{ + */ + +/** + * A proxy output stream that writes into two slave streams simultaneously. + */ +class TTeeOutput: public IOutputStream { +public: + TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept; + ~TTeeOutput() override; + +private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + +private: + IOutputStream* L_; + IOutputStream* R_; +}; + +/** @} */ diff --git a/util/stream/tempbuf.cpp b/util/stream/tempbuf.cpp new file mode 100644 index 0000000000..801a1fabb0 --- /dev/null +++ b/util/stream/tempbuf.cpp @@ -0,0 +1,22 @@ +#include "tempbuf.h" + +namespace { + static inline size_t Next(size_t size) noexcept { + return size * 2; + } +} + +void TTempBufOutput::DoWrite(const void* data, size_t len) { + if (Y_LIKELY(len <= Left())) { + Append(data, len); + } else { + const size_t filled = Filled(); + + TTempBuf buf(Next(filled + len)); + + buf.Append(Data(), filled); + buf.Append(data, len); + + static_cast<TTempBuf&>(*this) = buf; + } +} diff --git a/util/stream/tempbuf.h b/util/stream/tempbuf.h new file mode 100644 index 0000000000..a6dc001025 --- /dev/null +++ b/util/stream/tempbuf.h @@ -0,0 +1,21 @@ +#pragma once + +#include "output.h" + +#include <util/memory/tempbuf.h> + +class TTempBufOutput: public IOutputStream, public TTempBuf { +public: + inline TTempBufOutput() = default; + + explicit TTempBufOutput(size_t size) + : TTempBuf(size) + { + } + + TTempBufOutput(TTempBufOutput&&) noexcept = default; + TTempBufOutput& operator=(TTempBufOutput&&) noexcept = default; + +protected: + void DoWrite(const void* data, size_t len) override; +}; diff --git a/util/stream/tokenizer.cpp b/util/stream/tokenizer.cpp new file mode 100644 index 0000000000..44e719530a --- /dev/null +++ b/util/stream/tokenizer.cpp @@ -0,0 +1 @@ +#include "tokenizer.h" diff --git a/util/stream/tokenizer.h b/util/stream/tokenizer.h new file mode 100644 index 0000000000..b2398efdd1 --- /dev/null +++ b/util/stream/tokenizer.h @@ -0,0 +1,214 @@ +#pragma once + +#include "input.h" + +#include <util/generic/buffer.h> +#include <util/generic/mem_copy.h> +#include <util/generic/strbuf.h> +#include <util/system/compiler.h> +#include <util/system/yassert.h> + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Simple stream tokenizer. Splits the stream into tokens that are available + * via iterator interface. + * + * @tparam TEndOfToken Predicate for token delimiter characters. + * @see TEol + */ +template <typename TEndOfToken> +class TStreamTokenizer { +public: + class TIterator { + public: + inline TIterator(TStreamTokenizer* const parent) + : Parent_(parent) + , AtEnd_(!Parent_->Next(Data_, Len_)) + { + } + + inline TIterator() noexcept + : Parent_(nullptr) + , Data_(nullptr) + , Len_(0) + , AtEnd_(true) + { + } + + inline ~TIterator() = default; + + inline void operator++() { + Next(); + } + + inline bool operator==(const TIterator& l) const noexcept { + return AtEnd_ == l.AtEnd_; + } + + inline bool operator!=(const TIterator& l) const noexcept { + return !(*this == l); + } + + /** + * @return Return null-terminated character array with current token. + * The pointer may be invalid after iterator increment. + */ + inline const char* Data() const noexcept { + Y_ASSERT(!AtEnd_); + + return Data_; + } + + /** + * @return Length of current token. + */ + inline size_t Length() const noexcept { + Y_ASSERT(!AtEnd_); + + return Len_; + } + + inline TIterator* operator->() noexcept { + return this; + } + + inline TStringBuf operator*() noexcept { + return TStringBuf{Data_, Len_}; + } + + private: + inline void Next() { + Y_ASSERT(Parent_); + + AtEnd_ = !Parent_->Next(Data_, Len_); + } + + private: + TStreamTokenizer* const Parent_; + char* Data_; + size_t Len_; + bool AtEnd_; + }; + + inline TStreamTokenizer(IInputStream* const input, const TEndOfToken& eot = TEndOfToken(), + const size_t initial = 1024) + : Input_(input) + , Buf_(initial) + , Cur_(BufBegin()) + , End_(BufBegin()) + , Eot_(eot) + { + CheckBuf(); + } + + inline bool Next(char*& buf, size_t& len) { + char* it = Cur_; + + while (true) { + do { + while (it != End_) { + if (Eot_(*it)) { + *it = '\0'; + + buf = Cur_; + len = it - Cur_; + Cur_ = it + 1; + + return true; + } else { + ++it; + } + } + + if (Fill() == 0 && End_ != BufEnd()) { + *it = '\0'; + + buf = Cur_; + len = it - Cur_; + Cur_ = End_; + + return len; + } + } while (it != BufEnd()); + + Y_ASSERT(it == BufEnd()); + Y_ASSERT(End_ == BufEnd()); + + const size_t blen = End_ - Cur_; + if (Cur_ == BufBegin()) { + Y_ASSERT(blen == Buf_.Capacity()); + + /* + * do reallocate + */ + + Buf_.Reserve(Buf_.Capacity() * 4); + CheckBuf(); + } else { + /* + * do move + */ + + MemMove(BufBegin(), Cur_, blen); + } + + Cur_ = BufBegin(); + End_ = Cur_ + blen; + it = End_; + } + } + + inline TIterator begin() { + return TIterator{this}; + } + + inline TIterator end() noexcept { + return {}; + } + +private: + inline size_t Fill() { + const size_t avail = BufEnd() - End_; + const size_t bytesRead = Input_->Read(End_, avail); + + End_ += bytesRead; + + return bytesRead; + } + + inline char* BufBegin() noexcept { + return Buf_.Data(); + } + + inline char* BufEnd() noexcept { + return Buf_.Data() + Buf_.Capacity(); + } + + inline void CheckBuf() const { + if (!Buf_.Data()) { + throw std::bad_alloc(); + } + } + +private: + IInputStream* const Input_; + TBuffer Buf_; + char* Cur_; + char* End_; + TEndOfToken Eot_; +}; + +/** + * Predicate for `TStreamTokenizer` that uses '\\n' as a delimiter. + */ +struct TEol { + inline bool operator()(char ch) const noexcept { + return ch == '\n'; + } +}; + +/** @} */ diff --git a/util/stream/tokenizer_ut.cpp b/util/stream/tokenizer_ut.cpp new file mode 100644 index 0000000000..afc566da86 --- /dev/null +++ b/util/stream/tokenizer_ut.cpp @@ -0,0 +1,264 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/array_size.h> +#include <util/generic/strbuf.h> + +#include "mem.h" +#include "null.h" +#include "tokenizer.h" + +static inline void CheckIfNullTerminated(const TStringBuf str) { + UNIT_ASSERT_VALUES_EQUAL('\0', *(str.data() + str.size())); +} + +Y_UNIT_TEST_SUITE(TStreamTokenizerTests) { + Y_UNIT_TEST(EmptyStreamTest) { + auto&& input = TNullInput{}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()}); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(0, tokensCount); + } + + Y_UNIT_TEST(EmptyTokensTest) { + const char data[] = "\n\n"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()}); + UNIT_ASSERT_VALUES_EQUAL(0, it->Length()); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(2, tokensCount); + } + + Y_UNIT_TEST(LastTokenendDoesntSatisfyPredicateTest) { + const char data[] = "abc\ndef\nxxxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + UNIT_ASSERT(tokensCount < tokensSize); + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(FirstTokenIsEmptyTest) { + const char data[] = "\ndef\nxxxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf(), TStringBuf("def"), TStringBuf("xxxxxx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + UNIT_ASSERT(tokensCount < tokensSize); + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(PredicateDoesntMatch) { + const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(data, token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(1, tokensCount); + } + + Y_UNIT_TEST(SimpleTest) { + const char data[] = "qwerty\n1234567890\n"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("qwerty"), TStringBuf("1234567890")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + UNIT_ASSERT(tokensCount < tokensSize); + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(CustomPredicateTest) { + struct TIsVerticalBar { + inline bool operator()(const char ch) const noexcept { + return '|' == ch; + } + }; + + const char data[] = "abc|def|xxxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TIsVerticalBar>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + UNIT_ASSERT(tokensCount < tokensSize); + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(CustomPredicateSecondTest) { + struct TIsVerticalBar { + inline bool operator()(const char ch) const noexcept { + return '|' == ch || ',' == ch; + } + }; + + const char data[] = "abc|def|xxxxxx,abc|def|xxxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx"), + TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TIsVerticalBar>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + UNIT_ASSERT(tokensCount < tokensSize); + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(FalsePredicateTest) { + struct TAlwaysFalse { + inline bool operator()(const char) const noexcept { + return false; + } + }; + + const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TAlwaysFalse>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(data, token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(1, tokensCount); + } + + Y_UNIT_TEST(TruePredicateTest) { + struct TAlwaysTrue { + inline bool operator()(const char) const noexcept { + return true; + } + }; + + const char data[] = "1234567890-=!@#$%^&*()_+QWERTYUIOP{}qwertyuiop[]ASDFGHJKL:"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TAlwaysTrue>{&input}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + CheckIfNullTerminated(TStringBuf{it->Data(), it->Length()}); + UNIT_ASSERT_VALUES_EQUAL(0, it->Length()); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(dataSize, tokensCount); + } + + Y_UNIT_TEST(FirstTokenHasSizeOfTheBufferTest) { + const char data[] = "xxxxx\nxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("xxxxx"), TStringBuf("xx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, tokens[0].size()}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(OnlyTokenHasSizeOfTheBufferTest) { + const char data[] = "xxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, dataSize}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(data, token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(1, tokensCount); + } + + Y_UNIT_TEST(BufferSizeInitialSizeSmallerThanTokenTest) { + const char data[] = "xxxxx\nxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("xxxxx"), TStringBuf("xx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input, TEol{}, 1}; + auto tokensCount = size_t{}; + for (auto it = tokenizer.begin(); tokenizer.end() != it; ++it) { + const auto token = TStringBuf{it->Data(), it->Length()}; + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } + + Y_UNIT_TEST(RangeBasedForTest) { + const char data[] = "abc\ndef\nxxxxxx"; + const auto dataSize = Y_ARRAY_SIZE(data) - 1; + const TStringBuf tokens[] = {TStringBuf("abc"), TStringBuf("def"), TStringBuf("xxxxxx")}; + const auto tokensSize = Y_ARRAY_SIZE(tokens); + auto&& input = TMemoryInput{data, dataSize}; + auto&& tokenizer = TStreamTokenizer<TEol>{&input}; + auto tokensCount = size_t{}; + for (const auto& token : tokenizer) { + UNIT_ASSERT(tokensCount < tokensSize); + CheckIfNullTerminated(token); + UNIT_ASSERT_VALUES_EQUAL(tokens[tokensCount], token); + ++tokensCount; + } + UNIT_ASSERT_VALUES_EQUAL(tokensSize, tokensCount); + } +} diff --git a/util/stream/trace.cpp b/util/stream/trace.cpp new file mode 100644 index 0000000000..f37a0b76db --- /dev/null +++ b/util/stream/trace.cpp @@ -0,0 +1 @@ +#include "trace.h" diff --git a/util/stream/trace.h b/util/stream/trace.h new file mode 100644 index 0000000000..e74b6ecf3e --- /dev/null +++ b/util/stream/trace.h @@ -0,0 +1,60 @@ +#pragma once + +#include "debug.h" + +/** + * Debug level, as set via `DBGOUT` environment variable. + */ +enum ETraceLevel: ui8 { + TRACE_ERR = 1, + TRACE_WARN = 2, + TRACE_NOTICE = 3, + TRACE_INFO = 4, + TRACE_DEBUG = 5, + TRACE_DETAIL = 6, + TRACE_VERBOSE = 7 +}; + +#if !defined(NDEBUG) && !defined(Y_ENABLE_TRACE) + #define Y_ENABLE_TRACE +#endif + +#ifdef Y_ENABLE_TRACE + + /** + * Writes the given data into standard debug stream if current debug level set + * via `DBGOUT` environment variable permits it. + * + * Does nothing in release builds unless `Y_ENABLE_TRACE` is defined. + * + * Example usage: + * @code + * Y_DBGTRACE(DEBUG, "Advance from " << node1 << " to " << node2); + * @endcode + * + * @param elevel Debug level of this trace command, e.g. + * `WARN` or `DEBUG`. Basically a suffix of + * one of the values of `ETraceLevel` enum. + * @param args Argument chain to be written out into + * standard debug stream, joined with `<<` + * operator. + * @see ETraceLevel + */ + #define Y_DBGTRACE(elevel, args) Y_DBGTRACE0(int(TRACE_##elevel), args) + #define Y_DBGTRACE0(level, args) \ + do \ + if ((level) <= StdDbgLevel()) { \ + StdDbgStream() << args << Endl; \ + } \ + while (false) + +#else + + #define Y_DBGTRACE(elevel, args) \ + do { \ + } while (false) + #define Y_DBGTRACE0(level, args) \ + do { \ + } while (false) + +#endif diff --git a/util/stream/ut/ya.make b/util/stream/ut/ya.make new file mode 100644 index 0000000000..f0176dd7b4 --- /dev/null +++ b/util/stream/ut/ya.make @@ -0,0 +1,30 @@ +UNITTEST_FOR(util) + +OWNER(g:util) +SUBSCRIBER(g:util-subscribers) + +SRCS( + stream/aligned_ut.cpp + stream/buffer_ut.cpp + stream/buffered_ut.cpp + stream/direct_io_ut.cpp + stream/file_ut.cpp + stream/format_ut.cpp + stream/hex_ut.cpp + stream/input_ut.cpp + stream/ios_ut.cpp + stream/labeled_ut.cpp + stream/length_ut.cpp + stream/mem_ut.cpp + stream/multi_ut.cpp + stream/printf_ut.cpp + stream/str_ut.cpp + stream/tokenizer_ut.cpp + stream/walk_ut.cpp + stream/zerocopy_output_ut.cpp + stream/zlib_ut.cpp +) + +INCLUDE(${ARCADIA_ROOT}/util/tests/ya_util_tests.inc) + +END() diff --git a/util/stream/walk.cpp b/util/stream/walk.cpp new file mode 100644 index 0000000000..57dc9ab036 --- /dev/null +++ b/util/stream/walk.cpp @@ -0,0 +1,22 @@ +#include "walk.h" + +#include <util/generic/string.h> + +void IWalkInput::DoUndo(size_t len) { + Len_ += len; + Buf_ = static_cast<const char*>(Buf_) - len; +} + +size_t IWalkInput::DoNext(const void** ptr, size_t len) { + if (!Len_) { + Len_ = DoUnboundedNext(&Buf_); + } + + len = Min(Len_, len); + *ptr = Buf_; + + Buf_ = static_cast<const char*>(Buf_) + len; + Len_ -= len; + + return len; +} diff --git a/util/stream/walk.h b/util/stream/walk.h new file mode 100644 index 0000000000..7e62cb44dc --- /dev/null +++ b/util/stream/walk.h @@ -0,0 +1,35 @@ +#pragma once + +#include "zerocopy.h" + +/** + * Zero-copy stream that simplifies implementation of derived classes. + * + * Derived classes must implement `DoUnboundedNext` method. + */ +class IWalkInput: public IZeroCopyInputFastReadTo { +public: + IWalkInput() + : Buf_(nullptr) + , Len_(0) + { + } + +protected: + void DoUndo(size_t len) override; + size_t DoNext(const void** ptr, size_t len) override; + + /** + * Returns the next data chunk from this input stream. There are no + * restrictions on the size of the data chunk. + * + * @param ptr[out] Pointer to the start of the data chunk. + * @returns Size of the returned data chunk, in bytes. + * Return value of zero signals end of stream. + */ + virtual size_t DoUnboundedNext(const void** ptr) = 0; + +private: + const void* Buf_; + size_t Len_; +}; diff --git a/util/stream/walk_ut.cpp b/util/stream/walk_ut.cpp new file mode 100644 index 0000000000..e0a783799f --- /dev/null +++ b/util/stream/walk_ut.cpp @@ -0,0 +1,55 @@ +#include "walk.h" + +#include <library/cpp/testing/unittest/registar.h> + +class TStringListInput: public IWalkInput { +public: + TStringListInput(const TVector<TString>& data) + : Data_(data) + , Index_(0) + { + } + +protected: + size_t DoUnboundedNext(const void** ptr) override { + if (Index_ >= Data_.size()) { + return 0; + } + + const TString& string = Data_[Index_++]; + + *ptr = string.data(); + return string.size(); + } + +private: + const TVector<TString>& Data_; + size_t Index_; +}; + +Y_UNIT_TEST_SUITE(TWalkTest) { + Y_UNIT_TEST(ReadTo) { + TVector<TString> data; + data.push_back("111a"); + data.push_back("222b"); + data.push_back("333c"); + data.push_back("444d"); + data.push_back("555e"); + data.push_back("666f"); + + TStringListInput input(data); + + TString tmp1 = input.ReadTo('c'); + UNIT_ASSERT_VALUES_EQUAL(tmp1, "111a222b333"); + + char tmp2; + input.Read(&tmp2, 1); + UNIT_ASSERT_VALUES_EQUAL(tmp2, '4'); + + TString tmp3 = input.ReadTo('6'); + UNIT_ASSERT_VALUES_EQUAL(tmp3, "44d555e"); + + TString tmp4 = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL(tmp4, "66f"); + } +} diff --git a/util/stream/ya.make b/util/stream/ya.make new file mode 100644 index 0000000000..79c9498ddd --- /dev/null +++ b/util/stream/ya.make @@ -0,0 +1,6 @@ +OWNER(g:util) +SUBSCRIBER(g:util-subscribers) + +RECURSE_FOR_TESTS( + ut +) diff --git a/util/stream/zerocopy.cpp b/util/stream/zerocopy.cpp new file mode 100644 index 0000000000..dc2982ad55 --- /dev/null +++ b/util/stream/zerocopy.cpp @@ -0,0 +1,60 @@ +#include "zerocopy.h" +#include "output.h" + +IZeroCopyInput::~IZeroCopyInput() = default; + +size_t IZeroCopyInput::DoRead(void* buf, size_t len) { + const void* ptr; + size_t result = DoNext(&ptr, len); + + if (result) { + memcpy(buf, ptr, result); + } + + return result; +} + +ui64 IZeroCopyInput::DoReadAll(IOutputStream& out) { + ui64 result = 0; + const void* ptr; + + while (size_t len = Next(&ptr)) { + out.Write(ptr, len); + result += len; + } + + return result; +} + +size_t IZeroCopyInput::DoSkip(size_t len) { + const void* ptr; + + return DoNext(&ptr, len); +} + +IZeroCopyInputFastReadTo::~IZeroCopyInputFastReadTo() = default; + +size_t IZeroCopyInputFastReadTo::DoReadTo(TString& st, char ch) { + const char* ptr; + size_t len = Next(&ptr); + if (!len) { + return 0; + } + size_t result = 0; + st.clear(); + do { + if (const char* pos = (const char*)memchr(ptr, ch, len)) { + size_t bytesRead = (pos - ptr) + 1; + if (bytesRead > 1) { + st.append(ptr, pos); + } + Undo(len - bytesRead); + result += bytesRead; + return result; + } else { + result += len; + st.append(ptr, len); + } + } while (len = Next(&ptr)); + return result; +} diff --git a/util/stream/zerocopy.h b/util/stream/zerocopy.h new file mode 100644 index 0000000000..3315aa3a51 --- /dev/null +++ b/util/stream/zerocopy.h @@ -0,0 +1,91 @@ +#pragma once + +#include <util/system/yassert.h> +#include <util/system/defaults.h> +#include <util/generic/ylimits.h> + +#include "input.h" + +class IOutputStream; + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Input stream with direct access to the input buffer. + * + * Derived classes must implement `DoNext` method. + */ +class IZeroCopyInput: public IInputStream { +public: + IZeroCopyInput() noexcept = default; + ~IZeroCopyInput() override; + + IZeroCopyInput(IZeroCopyInput&&) noexcept = default; + IZeroCopyInput& operator=(IZeroCopyInput&&) noexcept = default; + + /** + * Returns the next data chunk from this input stream. + * + * Note that this function is not guaranteed to return the requested number + * of bytes, even if they are in fact available in the stream. + * + * @param ptr[out] Pointer to the start of the data chunk. + * @param len[in] Maximal size of the data chunk to be returned, in bytes. + * @returns Size of the returned data chunk, in bytes. + * Return value of zero signals end of stream. + */ + template <class T> + inline size_t Next(T** ptr, size_t len) { + Y_ASSERT(ptr); + + return DoNext((const void**)ptr, len); + } + + template <class T> + inline size_t Next(T** ptr) { + return Next(ptr, Max<size_t>()); + } + +protected: + size_t DoRead(void* buf, size_t len) override; + size_t DoSkip(size_t len) override; + ui64 DoReadAll(IOutputStream& out) override; + virtual size_t DoNext(const void** ptr, size_t len) = 0; +}; + +/** +* Input stream with direct access to the input buffer and ability to undo read +* +* Derived classes must implement `DoUndo` method. +*/ +class IZeroCopyInputFastReadTo: public IZeroCopyInput { +public: + IZeroCopyInputFastReadTo() noexcept = default; + ~IZeroCopyInputFastReadTo() override; + + IZeroCopyInputFastReadTo(IZeroCopyInputFastReadTo&&) noexcept = default; + IZeroCopyInputFastReadTo& operator=(IZeroCopyInputFastReadTo&&) noexcept = default; + +protected: + size_t DoReadTo(TString& st, char ch) override; + +private: + /** + * Undo read. + * + * Note that this function not check if you try undo more that read. In fact Undo used for undo read in last chunk. + * + * @param len[in] Bytes to undo. + */ + inline void Undo(size_t len) { + if (len) { + DoUndo(len); + } + } + virtual void DoUndo(size_t len) = 0; +}; + +/** @} */ diff --git a/util/stream/zerocopy_output.cpp b/util/stream/zerocopy_output.cpp new file mode 100644 index 0000000000..23600ef6e1 --- /dev/null +++ b/util/stream/zerocopy_output.cpp @@ -0,0 +1,18 @@ +#include "zerocopy_output.h" + +#include <util/generic/utility.h> + +void IZeroCopyOutput::DoWrite(const void* buf, size_t len) { + void* ptr = nullptr; + size_t writtenBytes = 0; + while (writtenBytes < len) { + size_t bufferSize = DoNext(&ptr); + Y_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, len - writtenBytes); + memcpy(ptr, static_cast<const char*>(buf) + writtenBytes, toWrite); + writtenBytes += toWrite; + if (toWrite < bufferSize) { + DoUndo(bufferSize - toWrite); + } + } +} diff --git a/util/stream/zerocopy_output.h b/util/stream/zerocopy_output.h new file mode 100644 index 0000000000..a388be04b0 --- /dev/null +++ b/util/stream/zerocopy_output.h @@ -0,0 +1,57 @@ +#pragma once + +#include <util/system/yassert.h> + +#include "output.h" + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Output stream with direct access to the output buffer. + * + * Derived classes must implement `DoNext` and `DoUndo` methods. + */ +class IZeroCopyOutput: public IOutputStream { +public: + IZeroCopyOutput() noexcept = default; + ~IZeroCopyOutput() override = default; + + IZeroCopyOutput(IZeroCopyOutput&&) noexcept = default; + IZeroCopyOutput& operator=(IZeroCopyOutput&&) noexcept = default; + + /** + * Returns the next buffer to write to from this output stream. + * + * @param ptr[out] Pointer to the start of the buffer. + * @returns Size of the returned buffer, in bytes. + * Return value is always nonzero. + */ + template <class T> + inline size_t Next(T** ptr) { + Y_ASSERT(ptr); + + return DoNext((void**)ptr); + } + + /** + * Tells the stream that `len` bytes at the end of the buffer returned previously + * by Next were actually not written so the current position in stream must be moved backwards. + * `len` must not be greater than the size of the buffer previously returned by `Next`. + * + * @param len[in] Number of bytes at the end to move the position by. + * + */ + inline void Undo(size_t len) { + return DoUndo(len); + } + +protected: + void DoWrite(const void* buf, size_t len) override; + virtual size_t DoNext(void** ptr) = 0; + virtual void DoUndo(size_t len) = 0; +}; + +/** @} */ diff --git a/util/stream/zerocopy_output_ut.cpp b/util/stream/zerocopy_output_ut.cpp new file mode 100644 index 0000000000..e81f7fb056 --- /dev/null +++ b/util/stream/zerocopy_output_ut.cpp @@ -0,0 +1,48 @@ +#include "zerocopy_output.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/string.h> + +// This version of string output stream is written here only +// for testing IZeroCopyOutput implementation of DoWrite. +class TSimpleStringOutput: public IZeroCopyOutput { +public: + TSimpleStringOutput(TString& s) noexcept + : S_(s) + { + } + +private: + size_t DoNext(void** ptr) override { + if (S_.size() == S_.capacity()) { + S_.reserve(FastClp2(S_.capacity() + 1)); + } + size_t previousSize = S_.size(); + S_.resize(S_.capacity()); + *ptr = S_.begin() + previousSize; + return S_.size() - previousSize; + } + + void DoUndo(size_t len) override { + Y_ENSURE(len <= S_.size()); + S_.resize(S_.size() - len); + } + + TString& S_; +}; + +Y_UNIT_TEST_SUITE(TestZerocopyOutput) { + Y_UNIT_TEST(Write) { + TString str; + TSimpleStringOutput output(str); + TString result; + + for (size_t i = 0; i < 1000; ++i) { + result.push_back('a' + (i % 20)); + } + + output.Write(result.begin(), result.size()); + UNIT_ASSERT_STRINGS_EQUAL(str, result); + } +} diff --git a/util/stream/zlib.cpp b/util/stream/zlib.cpp new file mode 100644 index 0000000000..60f4e9439f --- /dev/null +++ b/util/stream/zlib.cpp @@ -0,0 +1,380 @@ +#include "zlib.h" + +#include <util/memory/addstorage.h> +#include <util/generic/scope.h> +#include <util/generic/utility.h> + +#include <contrib/libs/zlib/zlib.h> + +#include <cstdio> +#include <cstring> + +namespace { + static const int opts[] = { + //Auto + 15 + 32, + //ZLib + 15 + 0, + //GZip + 15 + 16, + //Raw + -15}; + + class TZLibCommon { + public: + inline TZLibCommon() noexcept { + memset(Z(), 0, sizeof(*Z())); + } + + inline ~TZLibCommon() = default; + + inline const char* GetErrMsg() const noexcept { + return Z()->msg != nullptr ? Z()->msg : "unknown error"; + } + + inline z_stream* Z() const noexcept { + return (z_stream*)(&Z_); + } + + private: + z_stream Z_; + }; + + static inline ui32 MaxPortion(size_t s) noexcept { + return (ui32)Min<size_t>(Max<ui32>(), s); + } + + struct TChunkedZeroCopyInput { + inline TChunkedZeroCopyInput(IZeroCopyInput* in) + : In(in) + , Buf(nullptr) + , Len(0) + { + } + + template <class P, class T> + inline bool Next(P** buf, T* len) { + if (!Len) { + Len = In->Next(&Buf); + if (!Len) { + return false; + } + } + + const T toread = (T)Min((size_t)Max<T>(), Len); + + *len = toread; + *buf = (P*)Buf; + + Buf += toread; + Len -= toread; + + return true; + } + + IZeroCopyInput* In; + const char* Buf; + size_t Len; + }; +} + +class TZLibDecompress::TImpl: private TZLibCommon, public TChunkedZeroCopyInput { +public: + inline TImpl(IZeroCopyInput* in, ZLib::StreamType type, TStringBuf dict) + : TChunkedZeroCopyInput(in) + , Dict(dict) + { + if (inflateInit2(Z(), opts[type]) != Z_OK) { + ythrow TZLibDecompressorError() << "can not init inflate engine"; + } + + if (dict.size() && type == ZLib::Raw) { + SetDict(); + } + } + + virtual ~TImpl() { + inflateEnd(Z()); + } + + void SetAllowMultipleStreams(bool allowMultipleStreams) { + AllowMultipleStreams_ = allowMultipleStreams; + } + + inline size_t Read(void* buf, size_t size) { + Z()->next_out = (unsigned char*)buf; + Z()->avail_out = size; + + while (true) { + if (Z()->avail_in == 0) { + if (!FillInputBuffer()) { + return 0; + } + } + + switch (inflate(Z(), Z_SYNC_FLUSH)) { + case Z_NEED_DICT: { + SetDict(); + continue; + } + + case Z_STREAM_END: { + if (AllowMultipleStreams_) { + if (inflateReset(Z()) != Z_OK) { + ythrow TZLibDecompressorError() << "inflate reset error(" << GetErrMsg() << ")"; + } + } else { + return size - Z()->avail_out; + } + + [[fallthrough]]; + } + + case Z_OK: { + const size_t processed = size - Z()->avail_out; + + if (processed) { + return processed; + } + + break; + } + + default: + ythrow TZLibDecompressorError() << "inflate error(" << GetErrMsg() << ")"; + } + } + } + +private: + inline bool FillInputBuffer() { + return Next(&Z()->next_in, &Z()->avail_in); + } + + void SetDict() { + if (inflateSetDictionary(Z(), (const Bytef*)Dict.data(), Dict.size()) != Z_OK) { + ythrow TZLibCompressorError() << "can not set inflate dictionary"; + } + } + + bool AllowMultipleStreams_ = true; + TStringBuf Dict; +}; + +namespace { + class TDecompressStream: public IZeroCopyInput, public TZLibDecompress::TImpl, public TAdditionalStorage<TDecompressStream> { + public: + inline TDecompressStream(IInputStream* input, ZLib::StreamType type, TStringBuf dict) + : TZLibDecompress::TImpl(this, type, dict) + , Stream_(input) + { + } + + ~TDecompressStream() override = default; + + private: + size_t DoNext(const void** ptr, size_t len) override { + void* buf = AdditionalData(); + + *ptr = buf; + return Stream_->Read(buf, Min(len, AdditionalDataLength())); + } + + private: + IInputStream* Stream_; + }; + + using TZeroCopyDecompress = TZLibDecompress::TImpl; +} + +class TZLibCompress::TImpl: public TAdditionalStorage<TImpl>, private TZLibCommon { + static inline ZLib::StreamType Type(ZLib::StreamType type) { + if (type == ZLib::Auto) { + return ZLib::ZLib; + } + + if (type >= ZLib::Invalid) { + ythrow TZLibError() << "invalid compression type: " << static_cast<unsigned long>(type); + } + + return type; + } + +public: + inline TImpl(const TParams& p) + : Stream_(p.Out) + { + if (deflateInit2(Z(), Min<size_t>(9, p.CompressionLevel), Z_DEFLATED, opts[Type(p.Type)], 8, Z_DEFAULT_STRATEGY)) { + ythrow TZLibCompressorError() << "can not init inflate engine"; + } + + // Create exactly the same files on all platforms by fixing OS field in the header. + if (p.Type == ZLib::GZip) { + GZHeader_ = MakeHolder<gz_header>(); + GZHeader_->os = 3; // UNIX + deflateSetHeader(Z(), GZHeader_.Get()); + } + + if (p.Dict.size()) { + if (deflateSetDictionary(Z(), (const Bytef*)p.Dict.data(), p.Dict.size())) { + ythrow TZLibCompressorError() << "can not set deflate dictionary"; + } + } + + Z()->next_out = TmpBuf(); + Z()->avail_out = TmpBufLen(); + } + + inline ~TImpl() { + deflateEnd(Z()); + } + + inline void Write(const void* buf, size_t size) { + const Bytef* b = (const Bytef*)buf; + const Bytef* e = b + size; + + Y_DEFER { + Z()->next_in = nullptr; + Z()->avail_in = 0; + }; + do { + b = WritePart(b, e); + } while (b < e); + } + + inline const Bytef* WritePart(const Bytef* b, const Bytef* e) { + Z()->next_in = const_cast<Bytef*>(b); + Z()->avail_in = MaxPortion(e - b); + + while (Z()->avail_in) { + const int ret = deflate(Z(), Z_NO_FLUSH); + + switch (ret) { + case Z_OK: + continue; + + case Z_BUF_ERROR: + FlushBuffer(); + + break; + + default: + ythrow TZLibCompressorError() << "deflate error(" << GetErrMsg() << ")"; + } + } + + return Z()->next_in; + } + + inline void Flush() { + int ret = deflate(Z(), Z_SYNC_FLUSH); + + while ((ret == Z_OK || ret == Z_BUF_ERROR) && !Z()->avail_out) { + FlushBuffer(); + ret = deflate(Z(), Z_SYNC_FLUSH); + } + + if (ret != Z_OK && ret != Z_BUF_ERROR) { + ythrow TZLibCompressorError() << "deflate flush error(" << GetErrMsg() << ")"; + } + + if (Z()->avail_out < TmpBufLen()) { + FlushBuffer(); + } + } + + inline void FlushBuffer() { + Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); + Z()->next_out = TmpBuf(); + Z()->avail_out = TmpBufLen(); + } + + inline void Finish() { + int ret = deflate(Z(), Z_FINISH); + + while (ret == Z_OK || ret == Z_BUF_ERROR) { + FlushBuffer(); + ret = deflate(Z(), Z_FINISH); + } + + if (ret == Z_STREAM_END) { + Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); + } else { + ythrow TZLibCompressorError() << "deflate finish error(" << GetErrMsg() << ")"; + } + } + +private: + inline unsigned char* TmpBuf() noexcept { + return (unsigned char*)AdditionalData(); + } + + inline size_t TmpBufLen() const noexcept { + return AdditionalDataLength(); + } + +private: + IOutputStream* Stream_; + THolder<gz_header> GZHeader_; +}; + +TZLibDecompress::TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type, TStringBuf dict) + : Impl_(new TZeroCopyDecompress(input, type, dict)) +{ +} + +TZLibDecompress::TZLibDecompress(IInputStream* input, ZLib::StreamType type, size_t buflen, TStringBuf dict) + : Impl_(new (buflen) TDecompressStream(input, type, dict)) +{ +} + +void TZLibDecompress::SetAllowMultipleStreams(bool allowMultipleStreams) { + Impl_->SetAllowMultipleStreams(allowMultipleStreams); +} + +TZLibDecompress::~TZLibDecompress() = default; + +size_t TZLibDecompress::DoRead(void* buf, size_t size) { + return Impl_->Read(buf, MaxPortion(size)); +} + +void TZLibCompress::Init(const TParams& params) { + Y_ENSURE(params.BufLen >= 16, "ZLib buffer too small"); + Impl_.Reset(new (params.BufLen) TImpl(params)); +} + +void TZLibCompress::TDestruct::Destroy(TImpl* impl) { + delete impl; +} + +TZLibCompress::~TZLibCompress() { + try { + Finish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + +void TZLibCompress::DoWrite(const void* buf, size_t size) { + if (!Impl_) { + ythrow TZLibCompressorError() << "can not write to finished zlib stream"; + } + + Impl_->Write(buf, size); +} + +void TZLibCompress::DoFlush() { + if (Impl_) { + Impl_->Flush(); + } +} + +void TZLibCompress::DoFinish() { + THolder<TImpl> impl(Impl_.Release()); + + if (impl) { + impl->Finish(); + } +} + +TBufferedZLibDecompress::~TBufferedZLibDecompress() = default; diff --git a/util/stream/zlib.h b/util/stream/zlib.h new file mode 100644 index 0000000000..e7de7c81b7 --- /dev/null +++ b/util/stream/zlib.h @@ -0,0 +1,173 @@ +#pragma once + +#include "fwd.h" +#include "input.h" +#include "output.h" +#include "buffered.h" + +#include <util/system/defaults.h> +#include <util/generic/ptr.h> +#include <util/generic/yexception.h> + +/** + * @addtogroup Streams_Archs + * @{ + */ + +struct TZLibError: public yexception { +}; + +struct TZLibCompressorError: public TZLibError { +}; + +struct TZLibDecompressorError: public TZLibError { +}; + +namespace ZLib { + enum StreamType: ui8 { + Auto = 0, /**< Auto detect format. Can be used for decompression only. */ + ZLib = 1, + GZip = 2, + Raw = 3, + Invalid = 4 + }; + + enum { + ZLIB_BUF_LEN = 8 * 1024 + }; +} + +/** + * Non-buffered ZLib decompressing stream. + * + * Please don't use `TZLibDecompress` if you read text data from stream using + * `ReadLine`, it is VERY slow (approx 10 times slower, according to synthetic + * benchmark). For fast buffered ZLib stream reading use `TBufferedZLibDecompress` + * aka `TZDecompress`. + */ +class TZLibDecompress: public IInputStream { +public: + TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type = ZLib::Auto, TStringBuf dict = {}); + TZLibDecompress(IInputStream* input, ZLib::StreamType type = ZLib::Auto, size_t buflen = ZLib::ZLIB_BUF_LEN, + TStringBuf dict = {}); + + /** + * Allows/disallows multiple sequential compressed streams. Allowed by default. + * + * If multiple streams are allowed, their decompressed content will be concatenated. + * If multiple streams are disabled, then only first stream is decompressed. After that end + * of IInputStream will have happen, i.e. method Read() will return 0. + * + * @param allowMultipleStreams - flag to allow (true) or disable (false) multiple streams. + */ + void SetAllowMultipleStreams(bool allowMultipleStreams); + + ~TZLibDecompress() override; + +protected: + size_t DoRead(void* buf, size_t size) override; + +public: + class TImpl; + THolder<TImpl> Impl_; +}; + +/** + * Non-buffered ZLib compressing stream. + */ +class TZLibCompress: public IOutputStream { +public: + struct TParams { + inline TParams(IOutputStream* out) + : Out(out) + , Type(ZLib::ZLib) + , CompressionLevel(6) + , BufLen(ZLib::ZLIB_BUF_LEN) + { + } + + inline TParams& SetType(ZLib::StreamType type) noexcept { + Type = type; + + return *this; + } + + inline TParams& SetCompressionLevel(size_t level) noexcept { + CompressionLevel = level; + + return *this; + } + + inline TParams& SetBufLen(size_t buflen) noexcept { + BufLen = buflen; + + return *this; + } + + inline TParams& SetDict(const TStringBuf dict) noexcept { + Dict = dict; + + return *this; + } + + IOutputStream* Out; + ZLib::StreamType Type; + size_t CompressionLevel; + size_t BufLen; + TStringBuf Dict; + }; + + inline TZLibCompress(const TParams& params) { + Init(params); + } + + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type) { + Init(TParams(out).SetType(type)); + } + + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level) { + Init(TParams(out).SetType(type).SetCompressionLevel(compression_level)); + } + + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level, size_t buflen) { + Init(TParams(out).SetType(type).SetCompressionLevel(compression_level).SetBufLen(buflen)); + } + + ~TZLibCompress() override; + +private: + void Init(const TParams& opts); + + void DoWrite(const void* buf, size_t size) override; + void DoFlush() override; + void DoFinish() override; + +public: + class TImpl; + + /** To allow inline constructors. */ + struct TDestruct { + static void Destroy(TImpl* impl); + }; + + THolder<TImpl, TDestruct> Impl_; +}; + +/** + * Buffered ZLib decompressing stream. + * + * Supports efficient `ReadLine` calls and similar "reading in small pieces" + * usage patterns. + */ +class TBufferedZLibDecompress: public TBuffered<TZLibDecompress> { +public: + template <class T> + inline TBufferedZLibDecompress(T* in, ZLib::StreamType type = ZLib::Auto, size_t buf = 1 << 13) + : TBuffered<TZLibDecompress>(buf, in, type) + { + } + + ~TBufferedZLibDecompress() override; +}; + +/** @} */ diff --git a/util/stream/zlib_ut.cpp b/util/stream/zlib_ut.cpp new file mode 100644 index 0000000000..2290b4a9de --- /dev/null +++ b/util/stream/zlib_ut.cpp @@ -0,0 +1,230 @@ +#include "zlib.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include "file.h" +#include <util/system/tempfile.h> +#include <util/random/entropy.h> +#include <util/random/random.h> + +#define ZDATA "./zdata" + +class TThrowingStream: public IOutputStream { +public: + TThrowingStream(int limit) + : Limit_(limit) + { + } + + void DoWrite(const void*, size_t size) override { + if (Ignore) { + return; + } + + Limit_ -= size; + if (Limit_ < 0) { + throw yexception() << "catch this"; + } + } + + void DoFinish() override { + if (Ignore) { + return; + } + if (Limit_ < 0) { + throw yexception() << "catch this"; + } + } + + void DoFlush() override { + if (Ignore) { + return; + } + if (Limit_ < 0) { + throw yexception() << "catch this"; + } + } + + bool Ignore = false; + +private: + int Limit_; +}; + +Y_UNIT_TEST_SUITE(TZLibTest) { + static const TString DATA = "8s7d5vc6s5vc67sa4c65ascx6asd4xcv76adsfxv76s"; + static const TString DATA2 = "cn8wk2bd9vb3vdfif83g1ks94bfiovtwv"; + + Y_UNIT_TEST(Compress) { + TUnbufferedFileOutput o(ZDATA); + TZLibCompress c(&o, ZLib::ZLib); + + c.Write(DATA.data(), DATA.size()); + c.Finish(); + o.Finish(); + } + + Y_UNIT_TEST(Decompress) { + TTempFile tmpFile(ZDATA); + + { + TUnbufferedFileInput i(ZDATA); + TZLibDecompress d(&i); + + UNIT_ASSERT_EQUAL(d.ReadAll(), DATA); + } + } + + Y_UNIT_TEST(Dictionary) { + static constexpr TStringBuf data = "<html><body></body></html>"; + static constexpr TStringBuf dict = "</<html><body>"; + for (auto type : {ZLib::Raw, ZLib::ZLib}) { + TStringStream compressed; + { + TZLibCompress compressor(TZLibCompress::TParams(&compressed).SetDict(dict).SetType(type)); + compressor.Write(data); + } + + TZLibDecompress decompressor(&compressed, type, ZLib::ZLIB_BUF_LEN, dict); + UNIT_ASSERT_STRINGS_EQUAL(decompressor.ReadAll(), data); + } + } + + Y_UNIT_TEST(DecompressTwoStreams) { + // Check that Decompress(Compress(X) + Compress(Y)) == X + Y + TTempFile tmpFile(ZDATA); + { + TUnbufferedFileOutput o(ZDATA); + TZLibCompress c1(&o, ZLib::ZLib); + c1.Write(DATA.data(), DATA.size()); + c1.Finish(); + TZLibCompress c2(&o, ZLib::ZLib); + c2.Write(DATA2.data(), DATA2.size()); + c2.Finish(); + o.Finish(); + } + { + TUnbufferedFileInput i(ZDATA); + TZLibDecompress d(&i); + + UNIT_ASSERT_EQUAL(d.ReadAll(), DATA + DATA2); + } + } + + Y_UNIT_TEST(CompressionExceptionSegfault) { + TVector<char> buf(512 * 1024); + EntropyPool().Load(buf.data(), buf.size()); + + TThrowingStream o(128 * 1024); + TZLibCompress c(&o, ZLib::GZip, 4, 1 << 15); + try { + c.Write(buf.data(), buf.size()); + } catch (...) { + } + + o.Ignore = true; + TVector<char>().swap(buf); + } + + Y_UNIT_TEST(DecompressFirstOfTwoStreams) { + // Check that Decompress(Compress(X) + Compress(Y)) == X when single stream is allowed + TTempFile tmpFile(ZDATA); + { + TUnbufferedFileOutput o(ZDATA); + TZLibCompress c1(&o, ZLib::ZLib); + c1.Write(DATA.data(), DATA.size()); + c1.Finish(); + TZLibCompress c2(&o, ZLib::ZLib); + c2.Write(DATA2.data(), DATA2.size()); + c2.Finish(); + o.Finish(); + } + { + TUnbufferedFileInput i(ZDATA); + TZLibDecompress d(&i); + d.SetAllowMultipleStreams(false); + + UNIT_ASSERT_EQUAL(d.ReadAll(), DATA); + } + } + + Y_UNIT_TEST(CompressFlush) { + TString data = ""; + + for (size_t i = 0; i < 32; ++i) { + TTempFile tmpFile(ZDATA); + + TUnbufferedFileOutput output(ZDATA); + TZLibCompress compressor(&output, ZLib::ZLib); + + compressor.Write(data.data(), data.size()); + compressor.Flush(); + + { + TUnbufferedFileInput input(ZDATA); + TZLibDecompress decompressor(&input); + + UNIT_ASSERT_EQUAL(decompressor.ReadAll(), data); + } + + data += 'A' + i; + } + } + + Y_UNIT_TEST(CompressEmptyFlush) { + TTempFile tmpFile(ZDATA); + + TUnbufferedFileOutput output(ZDATA); + TZLibCompress compressor(&output, ZLib::ZLib); + + TUnbufferedFileInput input(ZDATA); + + compressor.Write(DATA.data(), DATA.size()); + compressor.Flush(); + + { + TZLibDecompress decompressor(&input); + UNIT_ASSERT_EQUAL(decompressor.ReadAll(), DATA); + } + + for (size_t i = 0; i < 10; ++i) { + compressor.Flush(); + } + + UNIT_ASSERT_EQUAL(input.ReadAll(), ""); + } + + Y_UNIT_TEST(CompressFlushSmallBuffer) { + for (size_t bufferSize = 16; bufferSize < 32; ++bufferSize) { + TString firstData = ""; + + for (size_t firstDataSize = 0; firstDataSize < 16; ++firstDataSize) { + TString secondData = ""; + + for (size_t secondDataSize = 0; secondDataSize < 16; ++secondDataSize) { + TTempFile tmpFile(ZDATA); + + TUnbufferedFileOutput output(ZDATA); + TZLibCompress compressor(TZLibCompress::TParams(&output).SetType(ZLib::ZLib).SetBufLen(bufferSize)); + + TUnbufferedFileInput input(ZDATA); + TZLibDecompress decompressor(&input); + + compressor.Write(firstData.data(), firstData.size()); + compressor.Flush(); + + UNIT_ASSERT_EQUAL(decompressor.ReadAll(), firstData); + + compressor.Write(secondData.data(), secondData.size()); + compressor.Flush(); + + UNIT_ASSERT_EQUAL(decompressor.ReadAll(), secondData); + + secondData += 'A' + secondDataSize; + } + + firstData += 'A' + firstDataSize; + } + } + } +} |