diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/stream | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/stream')
63 files changed, 2492 insertions, 2492 deletions
diff --git a/util/stream/aligned.cpp b/util/stream/aligned.cpp index 2fd12d15b7..2acc50c2bf 100644 --- a/util/stream/aligned.cpp +++ b/util/stream/aligned.cpp @@ -1,4 +1,4 @@ -#include "aligned.h" +#include "aligned.h" size_t TAlignedInput::DoRead(void* ptr, size_t len) { size_t ret = Stream_->Read(ptr, len); diff --git a/util/stream/aligned.h b/util/stream/aligned.h index 70e7be05a9..015c65dea0 100644 --- a/util/stream/aligned.h +++ b/util/stream/aligned.h @@ -22,7 +22,7 @@ public: , Position_(0) { } - + /** * Ensures alignment of the position in the input stream by skipping * some input. @@ -34,7 +34,7 @@ public: if (Position_ & (alignment - 1)) { size_t len = alignment - (Position_ & (alignment - 1)); - + do { len -= DoSkip(len); } while (len); @@ -63,7 +63,7 @@ public: , Position_(0) { } - + TAlignedOutput(TAlignedOutput&&) noexcept = default; TAlignedOutput& operator=(TAlignedOutput&&) noexcept = default; @@ -80,12 +80,12 @@ public: void Align(size_t alignment = sizeof(void*)) { Y_ASSERT(IsPowerOf2(alignment)); - static char unused[sizeof(void*) * 2]; + static char unused[sizeof(void*) * 2]; Y_ASSERT(alignment <= sizeof(unused)); - if (Position_ & (alignment - 1)) { + if (Position_ & (alignment - 1)) { DoWrite(unused, alignment - (Position_ & (alignment - 1))); - } + } } private: diff --git a/util/stream/aligned_ut.cpp b/util/stream/aligned_ut.cpp index e980d05cf7..633ea23dbe 100644 --- a/util/stream/aligned_ut.cpp +++ b/util/stream/aligned_ut.cpp @@ -4,16 +4,16 @@ class TNastyInputStream: public IInputStream { public: - TNastyInputStream() - : Pos_(0) - { - } + TNastyInputStream() + : Pos_(0) + { + } protected: size_t DoRead(void* buf, size_t len) override { - if (len == 0) { + if (len == 0) { return 0; - } + } *static_cast<unsigned char*>(buf) = static_cast<unsigned char>(Pos_); ++Pos_; @@ -21,9 +21,9 @@ protected: } size_t DoSkip(size_t len) override { - if (len == 0) { + if (len == 0) { return 0; - } + } ++Pos_; return 1; diff --git a/util/stream/buffer.cpp b/util/stream/buffer.cpp index 2facece4ea..78e65cdd46 100644 --- a/util/stream/buffer.cpp +++ b/util/stream/buffer.cpp @@ -1,16 +1,16 @@ -#include "buffer.h" -#include <util/generic/buffer.h> +#include "buffer.h" +#include <util/generic/buffer.h> #include <util/generic/yexception.h> - -class TBufferOutput::TImpl { -public: - inline TImpl(TBuffer& buf) - : Data_(buf) - { - } - + +class TBufferOutput::TImpl { +public: + inline TImpl(TBuffer& buf) + : Data_(buf) + { + } + virtual ~TImpl() = default; - + inline size_t DoNext(void** ptr) { if (Data_.Avail() == 0) { Data_.Reserve(FastClp2(Data_.Capacity() + MinBufferGrowSize)); @@ -26,51 +26,51 @@ public: Data_.Resize(Data_.size() - len); } - inline void DoWrite(const void* buf, size_t len) { - Data_.Append((const char*)buf, len); - } - + inline void DoWrite(const void* buf, size_t len) { + Data_.Append((const char*)buf, len); + } + inline void DoWriteC(char c) { Data_.Append(c); } inline TBuffer& Buffer() const noexcept { - return Data_; - } - -private: - TBuffer& Data_; + return Data_; + } + +private: + TBuffer& Data_; static constexpr size_t MinBufferGrowSize = 16; }; -namespace { +namespace { using TImpl = TBufferOutput::TImpl; - - class TOwnedImpl: private TBuffer, public TImpl { - public: - inline TOwnedImpl(size_t buflen) - : TBuffer(buflen) - , TImpl(static_cast<TBuffer&>(*this)) - { - } - }; -} - -TBufferOutput::TBufferOutput(size_t buflen) - : Impl_(new TOwnedImpl(buflen)) + + class TOwnedImpl: private TBuffer, public TImpl { + public: + inline TOwnedImpl(size_t buflen) + : TBuffer(buflen) + , TImpl(static_cast<TBuffer&>(*this)) + { + } + }; +} + +TBufferOutput::TBufferOutput(size_t buflen) + : Impl_(new TOwnedImpl(buflen)) { } -TBufferOutput::TBufferOutput(TBuffer& buffer) - : Impl_(new TImpl(buffer)) -{ +TBufferOutput::TBufferOutput(TBuffer& buffer) + : Impl_(new TImpl(buffer)) +{ } TBufferOutput::TBufferOutput(TBufferOutput&&) noexcept = default; TBufferOutput& TBufferOutput::operator=(TBufferOutput&&) noexcept = default; TBufferOutput::~TBufferOutput() = default; - + TBuffer& TBufferOutput::Buffer() const noexcept { return Impl_->Buffer(); } @@ -83,8 +83,8 @@ void TBufferOutput::DoUndo(size_t len) { Impl_->DoUndo(len); } -void TBufferOutput::DoWrite(const void* buf, size_t len) { - Impl_->DoWrite(buf, len); +void TBufferOutput::DoWrite(const void* buf, size_t len) { + Impl_->DoWrite(buf, len); } void TBufferOutput::DoWriteC(char c) { diff --git a/util/stream/buffer.h b/util/stream/buffer.h index 9dc99dbe49..0b3b94bb0a 100644 --- a/util/stream/buffer.h +++ b/util/stream/buffer.h @@ -1,11 +1,11 @@ #pragma once - + #include "zerocopy.h" #include "zerocopy_output.h" - + #include <util/generic/ptr.h> - -class TBuffer; + +class TBuffer; /** * @addtogroup Streams_Buffers @@ -16,25 +16,25 @@ class TBuffer; * Output stream that writes into a `TBuffer`. */ class TBufferOutput: public IZeroCopyOutput { -public: - class TImpl; - +public: + class TImpl; + /** * Constructs a stream that writes into an internal buffer. * * @param buflen Initial size of the internal buffer. - */ - TBufferOutput(size_t buflen = 1024); - + */ + TBufferOutput(size_t buflen = 1024); + /** * Constructs a stream that writes into the provided buffer. It's up to the * user to make sure that the buffer doesn't get destroyed while this stream * is in use. * * @param buffer Buffer to write into. - */ - TBufferOutput(TBuffer& buffer); - + */ + TBufferOutput(TBuffer& buffer); + TBufferOutput(TBufferOutput&&) noexcept; TBufferOutput& operator=(TBufferOutput&&) noexcept; @@ -44,22 +44,22 @@ public: * @returns Buffer that this stream writes into. */ TBuffer& Buffer() const noexcept; - -private: + +private: size_t DoNext(void** ptr) override; void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; - -private: - THolder<TImpl> Impl_; + +private: + THolder<TImpl> Impl_; }; /** * Input stream that reads from an external `TBuffer`. */ class TBufferInput: public IZeroCopyInputFastReadTo { -public: +public: /** * Constructs a stream that reads from an external buffer. It's up to the * user to make sure that the buffer doesn't get destroyed before this @@ -68,52 +68,52 @@ public: * @param buffer External buffer to read from. */ TBufferInput(const TBuffer& buffer); - + ~TBufferInput() override; - + const TBuffer& Buffer() const noexcept; void Rewind() noexcept; - + protected: size_t DoNext(const void** ptr, size_t len) override; void DoUndo(size_t len) override; -private: - const TBuffer& Buf_; - size_t Readed_; -}; - +private: + const TBuffer& Buf_; + size_t Readed_; +}; + /** * Input/output stream that works with a `TBuffer`. */ -class TBufferStream: public TBufferOutput, public TBufferInput { -public: +class TBufferStream: public TBufferOutput, public TBufferInput { +public: /** * Constructs a stream that works with an internal buffer. * * @param buflen Initial size of the internal buffer. */ - inline TBufferStream(size_t buflen = 1024) - : TBufferOutput(buflen) + inline TBufferStream(size_t buflen = 1024) + : TBufferOutput(buflen) , TBufferInput(TBufferOutput::Buffer()) - { - } - + { + } + /** * Constructs a stream that works with the provided buffer. * * @param buffer Buffer to work with. */ - inline TBufferStream(TBuffer& buffer) - : TBufferOutput(buffer) + inline TBufferStream(TBuffer& buffer) + : TBufferOutput(buffer) , TBufferInput(TBufferOutput::Buffer()) - { - } - + { + } + ~TBufferStream() override = default; using TBufferOutput::Buffer; -}; - +}; + /** @} */ diff --git a/util/stream/buffer_ut.cpp b/util/stream/buffer_ut.cpp index 3494696190..f9bf67a479 100644 --- a/util/stream/buffer_ut.cpp +++ b/util/stream/buffer_ut.cpp @@ -61,17 +61,17 @@ Y_UNIT_TEST_SUITE(TBufferTest) { Y_UNIT_TEST(Write) { TBuffer buffer; TBufferOutput output(buffer); - output << "1" - << "22" - << "333" - << "4444" - << "55555"; + output << "1" + << "22" + << "333" + << "4444" + << "55555"; UNIT_ASSERT(0 == memcmp(buffer.data(), "1" - "22" - "333" - "4444" - "55555", + "22" + "333" + "4444" + "55555", buffer.size())); } diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp index a00e592e1c..471944954c 100644 --- a/util/stream/buffered.cpp +++ b/util/stream/buffered.cpp @@ -1,40 +1,40 @@ -#include "mem.h" -#include "buffered.h" - -#include <util/memory/addstorage.h> -#include <util/generic/yexception.h> -#include <util/generic/buffer.h> - -class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> { -public: +#include "mem.h" +#include "buffered.h" + +#include <util/memory/addstorage.h> +#include <util/generic/yexception.h> +#include <util/generic/buffer.h> + +class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> { +public: inline TImpl(IInputStream* slave) - : Slave_(slave) + : Slave_(slave) , MemInput_(nullptr, 0) - { - } - + { + } + inline ~TImpl() = default; - + inline size_t Next(const void** ptr, size_t len) { - if (MemInput_.Exhausted()) { - MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); - } - + if (MemInput_.Exhausted()) { + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + return MemInput_.Next(ptr, len); - } - - inline size_t Read(void* buf, size_t len) { - if (MemInput_.Exhausted()) { - if (len > BufLen() / 2) { - return Slave_->Read(buf, len); - } - - MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); - } - - return MemInput_.Read(buf, len); - } - + } + + inline size_t Read(void* buf, size_t len) { + if (MemInput_.Exhausted()) { + if (len > BufLen() / 2) { + return Slave_->Read(buf, len); + } + + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + + return MemInput_.Read(buf, len); + } + inline size_t Skip(size_t len) { size_t totalSkipped = 0; while (len) { @@ -66,103 +66,103 @@ public: st.clear(); TString s_tmp; - + size_t ret = 0; - - while (true) { - if (MemInput_.Exhausted()) { + + while (true) { + if (MemInput_.Exhausted()) { const size_t bytesRead = Slave_->Read(Buf(), BufLen()); - + if (!bytesRead) { - break; - } - + break; + } + MemInput_.Reset(Buf(), bytesRead); - } - - const size_t a_len(MemInput_.Avail()); + } + + const size_t a_len(MemInput_.Avail()); size_t s_len = 0; if (st.empty()) { ret += MemInput_.ReadTo(st, to); s_len = st.length(); - } else { + } else { ret += MemInput_.ReadTo(s_tmp, to); s_len = s_tmp.length(); st.append(s_tmp); - } - - if (s_len != a_len) { - break; - } - } - - return ret; - } - + } + + if (s_len != a_len) { + break; + } + } + + return ret; + } + inline void Reset(IInputStream* slave) { - Slave_ = slave; - } - -private: + Slave_ = slave; + } + +private: inline size_t BufLen() const noexcept { - return AdditionalDataLength(); - } - + return AdditionalDataLength(); + } + inline void* Buf() const noexcept { - return AdditionalData(); - } - -private: + return AdditionalData(); + } + +private: IInputStream* Slave_; - TMemoryInput MemInput_; -}; - + TMemoryInput MemInput_; +}; + TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen) - : Impl_(new (buflen) TImpl(slave)) -{ -} - + : Impl_(new (buflen) TImpl(slave)) +{ +} + TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default; TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default; TBufferedInput::~TBufferedInput() = default; - -size_t TBufferedInput::DoRead(void* buf, size_t len) { - return Impl_->Read(buf, len); -} - + +size_t TBufferedInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + size_t TBufferedInput::DoSkip(size_t len) { return Impl_->Skip(len); } size_t TBufferedInput::DoNext(const void** ptr, size_t len) { - return Impl_->Next(ptr, len); -} - + return Impl_->Next(ptr, len); +} + size_t TBufferedInput::DoReadTo(TString& st, char ch) { - return Impl_->ReadTo(st, ch); -} - + return Impl_->ReadTo(st, ch); +} + void TBufferedInput::Reset(IInputStream* slave) { - Impl_->Reset(slave); -} - -class TBufferedOutputBase::TImpl { -public: + Impl_->Reset(slave); +} + +class TBufferedOutputBase::TImpl { +public: inline TImpl(IOutputStream* slave) - : Slave_(slave) - , MemOut_(nullptr, 0) - , PropagateFlush_(false) - , PropagateFinish_(false) - { - } - + : Slave_(slave) + , MemOut_(nullptr, 0) + , PropagateFlush_(false) + , PropagateFinish_(false) + { + } + virtual ~TImpl() = default; - - inline void Reset() { - MemOut_.Reset(Buf(), Len()); - } - + + inline void Reset() { + MemOut_.Reset(Buf(), Len()); + } + inline size_t Next(void** ptr) { if (MemOut_.Avail() == 0) { Slave_->Write(Buf(), Stored()); @@ -178,45 +178,45 @@ public: MemOut_.Undo(len); } - inline void Write(const void* buf, size_t len) { - if (len <= MemOut_.Avail()) { - /* - * fast path - */ - - MemOut_.Write(buf, len); - } else { - const size_t stored = Stored(); - const size_t full_len = stored + len; - const size_t good_len = DownToBufferGranularity(full_len); - const size_t write_from_buf = good_len - stored; - + inline void Write(const void* buf, size_t len) { + if (len <= MemOut_.Avail()) { + /* + * fast path + */ + + MemOut_.Write(buf, len); + } else { + const size_t stored = Stored(); + const size_t full_len = stored + len; + const size_t good_len = DownToBufferGranularity(full_len); + const size_t write_from_buf = good_len - stored; + using TPart = IOutputStream::TPart; - + alignas(TPart) char data[2 * sizeof(TPart)]; TPart* parts = reinterpret_cast<TPart*>(data); - TPart* end = parts; - - if (stored) { + TPart* end = parts; + + if (stored) { new (end++) TPart(Buf(), stored); - } - - if (write_from_buf) { + } + + if (write_from_buf) { new (end++) TPart(buf, write_from_buf); - } - - Slave_->Write(parts, end - parts); - - //grow buffer only on full flushes - OnBufferExhausted(); - Reset(); - - if (write_from_buf < len) { - MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf); - } - } - } - + } + + Slave_->Write(parts, end - parts); + + //grow buffer only on full flushes + OnBufferExhausted(); + Reset(); + + if (write_from_buf < len) { + MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf); + } + } + } + inline void Write(char c) { if (Y_UNLIKELY(MemOut_.Avail() == 0)) { Slave_->Write(Buf(), Stored()); @@ -228,145 +228,145 @@ public: } inline void SetFlushPropagateMode(bool mode) noexcept { - PropagateFlush_ = mode; - } - + PropagateFlush_ = mode; + } + inline void SetFinishPropagateMode(bool mode) noexcept { - PropagateFinish_ = mode; - } - - inline void Flush() { - { - Slave_->Write(Buf(), Stored()); - Reset(); - } - - if (PropagateFlush_) { - Slave_->Flush(); - } - } - - inline void Finish() { - try { - Flush(); - } catch (...) { + PropagateFinish_ = mode; + } + + inline void Flush() { + { + Slave_->Write(Buf(), Stored()); + Reset(); + } + + if (PropagateFlush_) { + Slave_->Flush(); + } + } + + inline void Finish() { + try { + Flush(); + } catch (...) { try { - DoFinish(); + DoFinish(); } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } - throw; + throw; } - DoFinish(); - } - -private: - inline void DoFinish() { - if (PropagateFinish_) { - Slave_->Finish(); - } - } - + DoFinish(); + } + +private: + inline void DoFinish() { + if (PropagateFinish_) { + Slave_->Finish(); + } + } + inline size_t Stored() const noexcept { - return Len() - MemOut_.Avail(); - } - + return Len() - MemOut_.Avail(); + } + inline size_t DownToBufferGranularity(size_t l) const noexcept { - return l - (l % Len()); - } - - virtual void OnBufferExhausted() = 0; + return l - (l % Len()); + } + + virtual void OnBufferExhausted() = 0; virtual void* Buf() const noexcept = 0; virtual size_t Len() const noexcept = 0; - -private: + +private: IOutputStream* Slave_; - TMemoryOutput MemOut_; - bool PropagateFlush_; - bool PropagateFinish_; -}; - -namespace { - struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> { + TMemoryOutput MemOut_; + bool PropagateFlush_; + bool PropagateFinish_; +}; + +namespace { + struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> { inline TSimpleImpl(IOutputStream* slave) - : TBufferedOutputBase::TImpl(slave) - { - Reset(); - } - + : TBufferedOutputBase::TImpl(slave) + { + Reset(); + } + ~TSimpleImpl() override = default; - + void OnBufferExhausted() final { - } - + } + void* Buf() const noexcept override { - return AdditionalData(); - } - + return AdditionalData(); + } + size_t Len() const noexcept override { - return AdditionalDataLength(); - } - }; - - struct TAdaptiveImpl: public TBufferedOutputBase::TImpl { - enum { - Step = 4096 - }; - + return AdditionalDataLength(); + } + }; + + struct TAdaptiveImpl: public TBufferedOutputBase::TImpl { + enum { + Step = 4096 + }; + inline TAdaptiveImpl(IOutputStream* slave) - : TBufferedOutputBase::TImpl(slave) - , N_(0) - { - B_.Reserve(Step); - Reset(); - } - + : TBufferedOutputBase::TImpl(slave) + , N_(0) + { + B_.Reserve(Step); + Reset(); + } + ~TAdaptiveImpl() override = default; - + void OnBufferExhausted() final { - const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10); - - if (c > B_.Capacity()) { - TBuffer(c).Swap(B_); - } - } - + const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10); + + if (c > B_.Capacity()) { + TBuffer(c).Swap(B_); + } + } + void* Buf() const noexcept override { - return (void*)B_.Data(); - } - + return (void*)B_.Data(); + } + size_t Len() const noexcept override { - return B_.Capacity(); - } - - TBuffer B_; - ui64 N_; - }; -} - + return B_.Capacity(); + } + + TBuffer B_; + ui64 N_; + }; +} + TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave) - : Impl_(new TAdaptiveImpl(slave)) -{ -} - + : Impl_(new TAdaptiveImpl(slave)) +{ +} + TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen) - : Impl_(new (buflen) TSimpleImpl(slave)) -{ -} - + : Impl_(new (buflen) TSimpleImpl(slave)) +{ +} + TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default; TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default; TBufferedOutputBase::~TBufferedOutputBase() { - try { - Finish(); - } catch (...) { - // ¯\_(ツ)_/¯ - } -} - + try { + Finish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + size_t TBufferedOutputBase::DoNext(void** ptr) { Y_ENSURE(Impl_.Get(), "cannot call next in finished stream"); return Impl_->Next(ptr); @@ -377,52 +377,52 @@ void TBufferedOutputBase::DoUndo(size_t len) { Impl_->Undo(len); } -void TBufferedOutputBase::DoWrite(const void* data, size_t len) { +void TBufferedOutputBase::DoWrite(const void* data, size_t len) { Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); Impl_->Write(data, len); -} - +} + void TBufferedOutputBase::DoWriteC(char c) { Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); Impl_->Write(c); } -void TBufferedOutputBase::DoFlush() { - if (Impl_.Get()) { - Impl_->Flush(); - } -} - -void TBufferedOutputBase::DoFinish() { - THolder<TImpl> impl(Impl_.Release()); - +void TBufferedOutputBase::DoFlush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TBufferedOutputBase::DoFinish() { + THolder<TImpl> impl(Impl_.Release()); + if (impl) { - impl->Finish(); - } -} - + impl->Finish(); + } +} + void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept { - if (Impl_.Get()) { - Impl_->SetFlushPropagateMode(propagate); - } -} - + if (Impl_.Get()) { + Impl_->SetFlushPropagateMode(propagate); + } +} + void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept { - if (Impl_.Get()) { - Impl_->SetFinishPropagateMode(propagate); - } -} - + if (Impl_.Get()) { + Impl_->SetFinishPropagateMode(propagate); + } +} + TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen) - : TBufferedOutputBase(slave, buflen) -{ -} - + : TBufferedOutputBase(slave, buflen) +{ +} + TBufferedOutput::~TBufferedOutput() = default; - + TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave) - : TBufferedOutputBase(slave) -{ -} - + : TBufferedOutputBase(slave) +{ +} + TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default; diff --git a/util/stream/buffered.h b/util/stream/buffered.h index 0847186141..e43d7ca406 100644 --- a/util/stream/buffered.h +++ b/util/stream/buffered.h @@ -1,13 +1,13 @@ #pragma once - + #include "zerocopy.h" #include "zerocopy_output.h" - + #include <utility> #include <util/generic/ptr.h> -#include <util/generic/typetraits.h> -#include <util/generic/store_policy.h> - +#include <util/generic/typetraits.h> +#include <util/generic/store_policy.h> + /** * @addtogroup Streams_Buffered * @{ @@ -21,33 +21,33 @@ * to the user to free it. */ class TBufferedInput: public IZeroCopyInput { -public: +public: TBufferedInput(IInputStream* slave, size_t buflen = 8192); TBufferedInput(TBufferedInput&&) noexcept; TBufferedInput& operator=(TBufferedInput&&) noexcept; ~TBufferedInput() override; - + /** * Switches the underlying stream to the one provided. Does not clear the * data that was already buffered. * * @param slave New underlying stream. - */ + */ void Reset(IInputStream* slave); - -protected: + +protected: size_t DoRead(void* buf, size_t len) override; size_t DoReadTo(TString& st, char ch) override; size_t DoSkip(size_t len) override; size_t DoNext(const void** ptr, size_t len) override; - -private: - class TImpl; - THolder<TImpl> Impl_; -}; - + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + /** * Output stream that wraps the given stream and adds a buffer on top of it, * thus making sure that data is written to the underlying stream in big chunks. @@ -60,7 +60,7 @@ private: * so it's up to the user to free it. */ class TBufferedOutputBase: public IZeroCopyOutput { -public: +public: /** * Constructs a buffered stream that dynamically adjusts the size of the * buffer. This works best when the amount of data that will be passed @@ -83,17 +83,17 @@ public: TBufferedOutputBase& operator=(TBufferedOutputBase&&) noexcept; ~TBufferedOutputBase() override; - + /** * @param propagate Whether `Flush` and `Finish` calls should * be propagated to the underlying stream. * By default they are not. - */ + */ inline void SetPropagateMode(bool propagate) noexcept { - SetFlushPropagateMode(propagate); - SetFinishPropagateMode(propagate); - } - + SetFlushPropagateMode(propagate); + SetFinishPropagateMode(propagate); + } + /** * @param propagate Whether `Flush` calls should be propagated * to the underlying stream. By default they @@ -107,65 +107,65 @@ public: * are not. */ void SetFinishPropagateMode(bool propagate) noexcept; - - class TImpl; - -protected: + + class TImpl; + +protected: size_t DoNext(void** ptr) override; void DoUndo(size_t len) override; void DoWrite(const void* data, size_t len) override; void DoWriteC(char c) override; void DoFlush() override; void DoFinish() override; - -private: - THolder<TImpl> Impl_; -}; - + +private: + THolder<TImpl> Impl_; +}; + /** * Buffered output stream with a fixed-size buffer. * * @see TBufferedOutputBase */ -class TBufferedOutput: public TBufferedOutputBase { -public: +class TBufferedOutput: public TBufferedOutputBase { +public: TBufferedOutput(IOutputStream* slave, size_t buflen = 8192); ~TBufferedOutput() override; TBufferedOutput(TBufferedOutput&&) noexcept = default; TBufferedOutput& operator=(TBufferedOutput&&) noexcept = default; -}; - +}; + /** * Buffered output stream that dynamically adjusts the size of the buffer based * on the amount of data that's passed through it. * * @see TBufferedOutputBase */ -class TAdaptiveBufferedOutput: public TBufferedOutputBase { -public: +class TAdaptiveBufferedOutput: public TBufferedOutputBase { +public: TAdaptiveBufferedOutput(IOutputStream* slave); ~TAdaptiveBufferedOutput() override; TAdaptiveBufferedOutput(TAdaptiveBufferedOutput&&) noexcept = default; TAdaptiveBufferedOutput& operator=(TAdaptiveBufferedOutput&&) noexcept = default; -}; - -namespace NPrivate { - struct TMyBufferedOutput: public TBufferedOutput { +}; + +namespace NPrivate { + struct TMyBufferedOutput: public TBufferedOutput { inline TMyBufferedOutput(IOutputStream* slave, size_t buflen) - : TBufferedOutput(slave, buflen) - { - SetFinishPropagateMode(true); - } - }; - - template <class T> - struct TBufferedStreamFor { + : TBufferedOutput(slave, buflen) + { + SetFinishPropagateMode(true); + } + }; + + template <class T> + struct TBufferedStreamFor { using TResult = std::conditional_t<std::is_base_of<IInputStream, T>::value, TBufferedInput, TMyBufferedOutput>; - }; -} - + }; +} + /** * A mixin class that turns unbuffered stream into a buffered one. * @@ -179,30 +179,30 @@ namespace NPrivate { * TBuffered<TUnbufferedFileOutput> file_output(1024, "/path/to/file"); * @endcode * Here 1024 is the size of the buffer. - */ -template <class TSlave> -class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult { + */ +template <class TSlave> +class TBuffered: private TEmbedPolicy<TSlave>, public ::NPrivate::TBufferedStreamFor<TSlave>::TResult { using TSlaveBase = TEmbedPolicy<TSlave>; using TBufferedBase = typename ::NPrivate::TBufferedStreamFor<TSlave>::TResult; - -public: - template <typename... Args> - inline TBuffered(size_t b, Args&&... args) + +public: + template <typename... Args> + inline TBuffered(size_t b, Args&&... args) : TSlaveBase(std::forward<Args>(args)...) - , TBufferedBase(TSlaveBase::Ptr(), b) - { - } - - inline TSlave& Slave() noexcept { - return *this->Ptr(); - } + , TBufferedBase(TSlaveBase::Ptr(), b) + { + } + + inline TSlave& Slave() noexcept { + return *this->Ptr(); + } TBuffered(const TBuffered&) = delete; TBuffered& operator=(const TBuffered&) = delete; TBuffered(TBuffered&&) = delete; TBuffered& operator=(TBuffered&&) = delete; -}; - +}; + /** * A mixin class that turns unbuffered stream into an adaptively buffered one. * Created stream differs from the one created via `TBuffered` template in that @@ -214,22 +214,22 @@ public: * TAdaptivelyBuffered<TUnbufferedFileOutput> file_output("/path/to/file"); * @endcode */ -template <class TSlave> -class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput { +template <class TSlave> +class TAdaptivelyBuffered: private TEmbedPolicy<TSlave>, public TAdaptiveBufferedOutput { using TSlaveBase = TEmbedPolicy<TSlave>; - -public: - template <typename... Args> - inline TAdaptivelyBuffered(Args&&... args) + +public: + template <typename... Args> + inline TAdaptivelyBuffered(Args&&... args) : TSlaveBase(std::forward<Args>(args)...) - , TAdaptiveBufferedOutput(TSlaveBase::Ptr()) - { - } + , TAdaptiveBufferedOutput(TSlaveBase::Ptr()) + { + } TAdaptivelyBuffered(const TAdaptivelyBuffered&) = delete; TAdaptivelyBuffered& operator=(const TAdaptivelyBuffered&) = delete; TAdaptivelyBuffered(TAdaptivelyBuffered&& other) = delete; TAdaptivelyBuffered& operator=(TAdaptivelyBuffered&& other) = delete; -}; - +}; + /** @} */ diff --git a/util/stream/buffered_ut.cpp b/util/stream/buffered_ut.cpp index 41d2fc3030..bc5b9361a9 100644 --- a/util/stream/buffered_ut.cpp +++ b/util/stream/buffered_ut.cpp @@ -1,64 +1,64 @@ -#include "buffered.h" - +#include "buffered.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <util/generic/string.h> -#include <util/random/mersenne.h> - +#include <util/random/mersenne.h> + Y_UNIT_TEST_SUITE(TestBufferedIO) { - template <class TOut> - inline void Run(TOut&& out) { - TMersenne<ui64> r; - - for (size_t i = 0; i < 1000; ++i) { - const size_t c = r.GenRand() % 10000; + template <class TOut> + inline void Run(TOut&& out) { + TMersenne<ui64> r; + + for (size_t i = 0; i < 1000; ++i) { + const size_t c = r.GenRand() % 10000; TString s; - - for (size_t j = 0; j < c; ++j) { - s.append('A' + (r.GenRand() % 10)); - } - + + for (size_t j = 0; j < c; ++j) { + s.append('A' + (r.GenRand() % 10)); + } + out.Write(s.data(), s.size()); - } - } - + } + } + Y_UNIT_TEST(TestEqual) { TString s1; TString s2; - - Run(TBuffered<TStringOutput>(8000, s1)); - Run(TAdaptivelyBuffered<TStringOutput>(s2)); - - UNIT_ASSERT_VALUES_EQUAL(s1, s2); - } - + + Run(TBuffered<TStringOutput>(8000, s1)); + Run(TAdaptivelyBuffered<TStringOutput>(s2)); + + UNIT_ASSERT_VALUES_EQUAL(s1, s2); + } + Y_UNIT_TEST(Test1) { TString s; - - TBuffered<TStringOutput>(100, s).Write("1", 1); - - UNIT_ASSERT_VALUES_EQUAL(s, "1"); - } - + + TBuffered<TStringOutput>(100, s).Write("1", 1); + + UNIT_ASSERT_VALUES_EQUAL(s, "1"); + } + Y_UNIT_TEST(Test2) { TString s; - - TBuffered<TStringOutput>(1, s).Write("12", 2); - - UNIT_ASSERT_VALUES_EQUAL(s, "12"); - } - + + TBuffered<TStringOutput>(1, s).Write("12", 2); + + UNIT_ASSERT_VALUES_EQUAL(s, "12"); + } + Y_UNIT_TEST(Test3) { TString s; - - auto&& b = TBuffered<TStringOutput>(1, s); - - b.Write("1", 1); - b.Write("12", 2); + + auto&& b = TBuffered<TStringOutput>(1, s); + + b.Write("1", 1); + b.Write("12", 2); b.Finish(); - - UNIT_ASSERT_VALUES_EQUAL(s, "112"); - } + + UNIT_ASSERT_VALUES_EQUAL(s, "112"); + } Y_UNIT_TEST(Test4) { TString s; @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { } template <class TOut> - inline void DoGenAndWrite(TOut&& output, TString& str) { + inline void DoGenAndWrite(TOut&& output, TString& str) { TMersenne<ui64> r; for (size_t i = 0; i < 43210; ++i) { str.append('A' + (r.GenRand() % 10)); @@ -111,21 +111,21 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { TString s("0123456789abcdefghijklmn"); TBuffered<TStringInput> in(5, s); char c; - UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //1 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //1 UNIT_ASSERT_VALUES_EQUAL(c, '0'); - UNIT_ASSERT_VALUES_EQUAL(in.Skip(4), 4); //5 end of buffer - UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //6 + UNIT_ASSERT_VALUES_EQUAL(in.Skip(4), 4); //5 end of buffer + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //6 UNIT_ASSERT_VALUES_EQUAL(c, '5'); - UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //9 - UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //10 end of buffer + UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //9 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //10 end of buffer UNIT_ASSERT_VALUES_EQUAL(c, '9'); - UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //13 - UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //14 start new buffer + UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //13 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //14 start new buffer UNIT_ASSERT_VALUES_EQUAL(c, 'd'); - UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 6); //20 - UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //21 start new buffer + UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 6); //20 + UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //21 start new buffer UNIT_ASSERT_VALUES_EQUAL(c, 'k'); - UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 3); //24 eof + UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 3); //24 eof } Y_UNIT_TEST(TestReadTo) { @@ -139,4 +139,4 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { UNIT_ASSERT_VALUES_EQUAL(in.ReadTo(t, 'z'), 4); UNIT_ASSERT_VALUES_EQUAL(t, "9abc"); } -} +} diff --git a/util/stream/debug.cpp b/util/stream/debug.cpp index afd5b3e1c7..35c43331ac 100644 --- a/util/stream/debug.cpp +++ b/util/stream/debug.cpp @@ -1,49 +1,49 @@ -#include "null.h" -#include "debug.h" - -#include <util/string/cast.h> -#include <util/generic/singleton.h> -#include <util/generic/yexception.h> - -#include <cstdio> -#include <cstdlib> - -void TDebugOutput::DoWrite(const void* buf, size_t len) { - if (len != fwrite(buf, 1, len, stderr)) { +#include "null.h" +#include "debug.h" + +#include <util/string/cast.h> +#include <util/generic/singleton.h> +#include <util/generic/yexception.h> + +#include <cstdio> +#include <cstdlib> + +void TDebugOutput::DoWrite(const void* buf, size_t len) { + if (len != fwrite(buf, 1, len, stderr)) { ythrow yexception() << "write failed(" << LastSystemErrorText() << ")"; - } -} - -namespace { - struct TDbgSelector { - inline TDbgSelector() { - char* dbg = getenv("DBGOUT"); - if (dbg) { - Out = &Cerr; - try { - Level = FromString(dbg); - } catch (const yexception&) { - Level = 0; - } - } else { - Out = &Cnull; + } +} + +namespace { + struct TDbgSelector { + inline TDbgSelector() { + char* dbg = getenv("DBGOUT"); + if (dbg) { + Out = &Cerr; + try { + Level = FromString(dbg); + } catch (const yexception&) { + Level = 0; + } + } else { + Out = &Cnull; Level = 0; } } IOutputStream* Out; - int Level; - }; -} - + int Level; + }; +} + template <> struct TSingletonTraits<TDbgSelector> { static constexpr size_t Priority = 8; }; IOutputStream& StdDbgStream() noexcept { - return *(Singleton<TDbgSelector>()->Out); -} + return *(Singleton<TDbgSelector>()->Out); +} int StdDbgLevel() noexcept { return Singleton<TDbgSelector>()->Level; diff --git a/util/stream/debug.h b/util/stream/debug.h index 92d6d4b42d..cd7e42c146 100644 --- a/util/stream/debug.h +++ b/util/stream/debug.h @@ -1,7 +1,7 @@ #pragma once - -#include "output.h" - + +#include "output.h" + /** * @addtogroup Streams * @{ @@ -11,23 +11,23 @@ * Debug output stream. Writes into `stderr`. */ class TDebugOutput: public IOutputStream { -public: +public: inline TDebugOutput() noexcept = default; ~TDebugOutput() override = default; - + TDebugOutput(TDebugOutput&&) noexcept = default; TDebugOutput& operator=(TDebugOutput&&) noexcept = default; -private: +private: void DoWrite(const void* buf, size_t len) override; -}; - +}; + /** * @returns Standard debug stream. * @see Cdbg */ IOutputStream& StdDbgStream() noexcept; - + /** * This function returns the current debug level as set via `DBGOUT` environment * variable. @@ -48,6 +48,6 @@ int StdDbgLevel() noexcept; * If this variable is set, then this stream is redirected into `stderr`, * otherwise whatever is written into it is simply ignored. */ -#define Cdbg (StdDbgStream()) - +#define Cdbg (StdDbgStream()) + /** @} */ diff --git a/util/stream/direct_io.cpp b/util/stream/direct_io.cpp index 649033af34..186154f4f6 100644 --- a/util/stream/direct_io.cpp +++ b/util/stream/direct_io.cpp @@ -1,7 +1,7 @@ #include "direct_io.h" -#include <util/generic/utility.h> - +#include <util/generic/utility.h> + size_t TRandomAccessFileInput::DoRead(void* buf, size_t len) { const size_t result = File.Pread(buf, len, Position); Position += result; @@ -15,7 +15,7 @@ TRandomAccessFileInput::TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 } size_t TRandomAccessFileInput::DoSkip(size_t len) { - size_t skiped = Min(len, (size_t)Min((ui64)Max<size_t>(), File.GetLength() - Position)); + size_t skiped = Min(len, (size_t)Min((ui64)Max<size_t>(), File.GetLength() - Position)); Position += skiped; return skiped; } diff --git a/util/stream/file.cpp b/util/stream/file.cpp index dc5d2f6311..da2ae8a2b3 100644 --- a/util/stream/file.cpp +++ b/util/stream/file.cpp @@ -1,30 +1,30 @@ -#include "file.h" - +#include "file.h" + #include <util/memory/blob.h> -#include <util/generic/yexception.h> - +#include <util/generic/yexception.h> + TUnbufferedFileInput::TUnbufferedFileInput(const TString& path) - : File_(path, OpenExisting | RdOnly | Seq) + : File_(path, OpenExisting | RdOnly | Seq) { - if (!File_.IsOpen()) { - ythrow TIoException() << "file " << path << " not open"; + if (!File_.IsOpen()) { + ythrow TIoException() << "file " << path << " not open"; } } TUnbufferedFileInput::TUnbufferedFileInput(const TFile& file) - : File_(file) -{ - if (!File_.IsOpen()) { - ythrow TIoException() << "file (" << file.GetName() << ") not open"; - } -} - + : File_(file) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "file (" << file.GetName() << ") not open"; + } +} + size_t TUnbufferedFileInput::DoRead(void* buf, size_t len) { return File_.ReadOrFail(buf, len); -} - +} + size_t TUnbufferedFileInput::DoSkip(size_t len) { - if (len < 384) { + if (len < 384) { /* Base implementation calls DoRead, which results in one system call * instead of three as in fair skip implementation. For small sizes * actually doing one read is cheaper. Experiments show that the @@ -32,66 +32,66 @@ size_t TUnbufferedFileInput::DoSkip(size_t len) { * in the range of 384-512 bytes (assuming that the file is in OS cache). */ return IInputStream::DoSkip(len); } - - /* TFile::Seek can seek beyond the end of file, so we need to do - * size check here. */ - i64 size = File_.GetLength(); - i64 oldPos = File_.GetPosition(); - i64 newPos = File_.Seek(Min<i64>(size, oldPos + len), sSet); - - return newPos - oldPos; + + /* TFile::Seek can seek beyond the end of file, so we need to do + * size check here. */ + i64 size = File_.GetLength(); + i64 oldPos = File_.GetPosition(); + i64 newPos = File_.Seek(Min<i64>(size, oldPos + len), sSet); + + return newPos - oldPos; } TUnbufferedFileOutput::TUnbufferedFileOutput(const TString& path) : File_(path, CreateAlways | WrOnly | Seq) -{ - if (!File_.IsOpen()) { - ythrow TFileError() << "can not open " << path; - } -} - +{ + if (!File_.IsOpen()) { + ythrow TFileError() << "can not open " << path; + } +} + TUnbufferedFileOutput::TUnbufferedFileOutput(const TFile& file) - : File_(file) -{ - if (!File_.IsOpen()) { - ythrow TIoException() << "closed file(" << file.GetName() << ") passed"; - } -} - + : File_(file) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "closed file(" << file.GetName() << ") passed"; + } +} + TUnbufferedFileOutput::~TUnbufferedFileOutput() = default; - + void TUnbufferedFileOutput::DoWrite(const void* buf, size_t len) { - File_.Write(buf, len); -} - + File_.Write(buf, len); +} + void TUnbufferedFileOutput::DoFlush() { - if (File_.IsOpen()) { - File_.Flush(); - } -} - -class TMappedFileInput::TImpl: public TBlob { -public: - inline TImpl(TFile file) - : TBlob(TBlob::FromFile(file)) - { + if (File_.IsOpen()) { + File_.Flush(); } - +} + +class TMappedFileInput::TImpl: public TBlob { +public: + inline TImpl(TFile file) + : TBlob(TBlob::FromFile(file)) + { + } + inline ~TImpl() = default; -}; - +}; + TMappedFileInput::TMappedFileInput(const TFile& file) : TMemoryInput(nullptr, 0) - , Impl_(new TImpl(file)) -{ - Reset(Impl_->Data(), Impl_->Size()); -} - + , Impl_(new TImpl(file)) +{ + Reset(Impl_->Data(), Impl_->Size()); +} + TMappedFileInput::TMappedFileInput(const TString& path) : TMemoryInput(nullptr, 0) - , Impl_(new TImpl(TFile(path, OpenExisting | RdOnly))) + , Impl_(new TImpl(TFile(path, OpenExisting | RdOnly))) { - Reset(Impl_->Data(), Impl_->Size()); + Reset(Impl_->Data(), Impl_->Size()); } TMappedFileInput::~TMappedFileInput() = default; diff --git a/util/stream/file.h b/util/stream/file.h index c1cf4f591d..ece0ed3dc8 100644 --- a/util/stream/file.h +++ b/util/stream/file.h @@ -3,16 +3,16 @@ #include "fwd.h" #include "input.h" #include "output.h" -#include "buffered.h" +#include "buffered.h" #include "mem.h" - + #include <util/system/file.h> #include <utility> - + /** * @addtogroup Streams_Files * @{ - */ + */ /** * Unbuffered file input stream. @@ -21,32 +21,32 @@ * be _very_ slow. */ class TUnbufferedFileInput: public IInputStream { -public: +public: TUnbufferedFileInput(const TFile& file); TUnbufferedFileInput(const TString& path); - -private: + +private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; - -private: - TFile File_; -}; - + +private: + TFile File_; +}; + /** * Memory-mapped file input stream. */ -class TMappedFileInput: public TMemoryInput { -public: - TMappedFileInput(const TFile& file); +class TMappedFileInput: public TMemoryInput { +public: + TMappedFileInput(const TFile& file); TMappedFileInput(const TString& path); ~TMappedFileInput() override; - -private: - class TImpl; - THolder<TImpl> Impl_; -}; - + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + /** * File output stream. * @@ -54,35 +54,35 @@ private: * likely to be quite slow. */ class TUnbufferedFileOutput: public IOutputStream { -public: +public: TUnbufferedFileOutput(const TString& path); TUnbufferedFileOutput(const TFile& file); ~TUnbufferedFileOutput() override; - + TUnbufferedFileOutput(TUnbufferedFileOutput&&) noexcept = default; TUnbufferedFileOutput& operator=(TUnbufferedFileOutput&&) noexcept = default; -private: +private: void DoWrite(const void* buf, size_t len) override; void DoFlush() override; - -private: - TFile File_; -}; - + +private: + TFile File_; +}; + /** * Buffered file input stream. * * @see TBuffered */ class TFileInput: public TBuffered<TUnbufferedFileInput> { -public: - template <class T> +public: + template <class T> inline TFileInput(T&& t, size_t buf = 1 << 13) : TBuffered<TUnbufferedFileInput>(buf, std::forward<T>(t)) - { - } - + { + } + ~TFileInput() override = default; }; @@ -95,13 +95,13 @@ public: * @see TBuffered */ class TFixedBufferFileOutput: public TBuffered<TUnbufferedFileOutput> { -public: - template <class T> +public: + template <class T> inline TFixedBufferFileOutput(T&& t, size_t buf = 1 << 13) : TBuffered<TUnbufferedFileOutput>(buf, std::forward<T>(t)) - { - } - + { + } + ~TFixedBufferFileOutput() override = default; }; diff --git a/util/stream/format.cpp b/util/stream/format.cpp index 3996130df5..3eabd45eb4 100644 --- a/util/stream/format.cpp +++ b/util/stream/format.cpp @@ -1,14 +1,14 @@ -#include "format.h" -#include "output.h" - +#include "format.h" +#include "output.h" + #include <util/generic/ymath.h> -#include <util/string/cast.h> - +#include <util/string/cast.h> + namespace NFormatPrivate { static inline i64 Round(double value) { double res1 = floor(value); double res2 = ceil(value); - return (value - res1 < res2 - value) ? (i64)res1 : (i64)res2; + return (value - res1 < res2 - value) ? (i64)res1 : (i64)res2; } static inline IOutputStream& PrintDoubleShortly(IOutputStream& os, const double& d) { @@ -34,9 +34,9 @@ namespace NFormatPrivate { return os << Prec(d, mode, ndigits); } -} +} -template <> +template <> void Out<NFormatPrivate::THumanReadableSize>(IOutputStream& stream, const NFormatPrivate::THumanReadableSize& value) { ui64 base = value.Format == SF_BYTES ? 1024 : 1000; ui64 base2 = base * base; @@ -70,7 +70,7 @@ void Out<NFormatPrivate::THumanReadableSize>(IOutputStream& stream, const NForma } } -template <> +template <> void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NFormatPrivate::THumanReadableDuration& hr) { TTempBuf buf; TMemoryOutput ss(buf.Data(), buf.Size()); @@ -82,11 +82,11 @@ void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NForma break; } if (microSeconds < 1000 * 1000) { - NFormatPrivate::PrintDoubleShortly(ss, (double)microSeconds / 1000.0) << "ms"; + NFormatPrivate::PrintDoubleShortly(ss, (double)microSeconds / 1000.0) << "ms"; break; } - double seconds = (double)(hr.Value.MilliSeconds()) / 1000.0; + double seconds = (double)(hr.Value.MilliSeconds()) / 1000.0; if (seconds < 60) { NFormatPrivate::PrintDoubleShortly(ss, seconds) << 's'; break; @@ -109,9 +109,9 @@ void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NForma for (size_t i = 0; i < Y_ARRAY_SIZE(times); ++i) { if (times[i] > 0) { - if (!first) { + if (!first) { ss << ' '; - } + } ss << times[i] << names[i]; first = false; } @@ -123,12 +123,12 @@ void Out<NFormatPrivate::THumanReadableDuration>(IOutputStream& os, const NForma } void Time(IOutputStream& l) { - l << millisec(); -} - + l << millisec(); +} + void TimeHumanReadable(IOutputStream& l) { - char timeStr[30]; + char timeStr[30]; const time_t t = time(nullptr); - - l << ctime_r(&t, timeStr); + + l << ctime_r(&t, timeStr); } diff --git a/util/stream/format.h b/util/stream/format.h index b033208a1b..5ad2a4ec75 100644 --- a/util/stream/format.h +++ b/util/stream/format.h @@ -1,13 +1,13 @@ #pragma once -#include "mem.h" -#include "output.h" - +#include "mem.h" +#include "output.h" + #include <util/datetime/base.h> #include <util/generic/strbuf.h> #include <util/generic/flags.h> #include <util/memory/tempbuf.h> -#include <util/string/cast.h> +#include <util/string/cast.h> enum ENumberFormatFlag { HF_FULL = 0x01, /**< Output number with leading zeros. */ @@ -17,16 +17,16 @@ Y_DECLARE_FLAGS(ENumberFormat, ENumberFormatFlag) Y_DECLARE_OPERATORS_FOR_FLAGS(ENumberFormat) enum ESizeFormat { - SF_QUANTITY, /**< Base 1000, usual suffixes. 1100 gets turned into "1.1K". */ - SF_BYTES, /**< Base 1024, byte suffix. 1100 gets turned into "1.07KiB". */ + SF_QUANTITY, /**< Base 1000, usual suffixes. 1100 gets turned into "1.1K". */ + SF_BYTES, /**< Base 1024, byte suffix. 1100 gets turned into "1.07KiB". */ }; namespace NFormatPrivate { template <size_t Value> - struct TLog2: std::integral_constant<size_t, TLog2<Value / 2>::value + 1> {}; + struct TLog2: std::integral_constant<size_t, TLog2<Value / 2>::value + 1> {}; template <> - struct TLog2<1>: std::integral_constant<size_t, 0> {}; + struct TLog2<1>: std::integral_constant<size_t, 0> {}; static inline void WriteChars(IOutputStream& os, char c, size_t count) { if (count == 0) @@ -105,29 +105,29 @@ namespace NFormatPrivate { template <typename T, size_t Base> using TUnsignedBaseNumber = TBaseNumber<std::make_unsigned_t<std::remove_cv_t<T>>, Base>; - + template <typename T, size_t Base> IOutputStream& operator<<(IOutputStream& stream, const TBaseNumber<T, Base>& value) { char buf[8 * sizeof(T) + 1]; /* Add 1 for sign. */ TStringBuf str(buf, IntToString<Base>(value.Value, buf, sizeof(buf))); - + if (str[0] == '-') { stream << '-'; str.Skip(1); } - + if (value.Flags & HF_ADDX) { if (Base == 16) { stream << TStringBuf("0x"); } else if (Base == 2) { stream << TStringBuf("0b"); } - } - + } + if (value.Flags & HF_FULL) { WriteChars(stream, '0', (8 * sizeof(T) + TLog2<Base>::value - 1) / TLog2<Base>::value - str.size()); - } - + } + stream << str; return stream; } @@ -138,8 +138,8 @@ namespace NFormatPrivate { inline TBaseText(const TBasicStringBuf<Char> text) : Text(text) - { - } + { + } }; template <typename Char, size_t Base> @@ -153,7 +153,7 @@ namespace NFormatPrivate { return os; } - template <typename T> + template <typename T> struct TFloatPrecision { using TdVal = std::remove_cv_t<T>; static_assert(std::is_floating_point<TdVal>::value, "expect std::is_floating_point<TdVal>::value"); @@ -163,7 +163,7 @@ namespace NFormatPrivate { int NDigits; }; - template <typename T> + template <typename T> IOutputStream& operator<<(IOutputStream& o, const TFloatPrecision<T>& prec) { char buf[512]; size_t count = FloatToString(prec.Value, buf, sizeof(buf), prec.Mode, prec.NDigits); @@ -176,8 +176,8 @@ namespace NFormatPrivate { constexpr THumanReadableDuration(const TDuration& value) : Value(value) - { - } + { + } }; struct THumanReadableSize { @@ -208,8 +208,8 @@ static constexpr ::NFormatPrivate::TLeftPad<T> LeftPad(const T& value, const siz return ::NFormatPrivate::TLeftPad<T>(value, width, padc); } -template <typename T, int N> -static constexpr ::NFormatPrivate::TLeftPad<const T*> LeftPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { +template <typename T, int N> +static constexpr ::NFormatPrivate::TLeftPad<const T*> LeftPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { return ::NFormatPrivate::TLeftPad<const T*>(value, width, padc); } @@ -234,8 +234,8 @@ static constexpr ::NFormatPrivate::TRightPad<T> RightPad(const T& value, const s return ::NFormatPrivate::TRightPad<T>(value, width, padc); } -template <typename T, int N> -static constexpr ::NFormatPrivate::TRightPad<const T*> RightPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { +template <typename T, int N> +static constexpr ::NFormatPrivate::TRightPad<const T*> RightPad(const T (&value)[N], const size_t width, const char padc = ' ') noexcept { return ::NFormatPrivate::TRightPad<const T*>(value, width, padc); } @@ -397,7 +397,7 @@ static constexpr ::NFormatPrivate::THumanReadableDuration HumanReadable(const TD * @param format Format to use. */ static constexpr ::NFormatPrivate::THumanReadableSize HumanReadableSize(const double size, ESizeFormat format) noexcept { - return {size, format}; + return {size, format}; } void Time(IOutputStream& l); @@ -438,7 +438,7 @@ static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const * @param value float or double to output. * @param ndigits Number of significant digits. */ -template <typename T> +template <typename T> static constexpr ::NFormatPrivate::TFloatPrecision<T> Prec(const T& value, const int ndigits) noexcept { return {value, PREC_NDIGITS, ndigits}; } diff --git a/util/stream/format_ut.cpp b/util/stream/format_ut.cpp index 43245aeb48..49f42db370 100644 --- a/util/stream/format_ut.cpp +++ b/util/stream/format_ut.cpp @@ -1,5 +1,5 @@ -#include "format.h" - +#include "format.h" + #include <library/cpp/testing/unittest/registar.h> #include <util/charset/wide.h> @@ -25,11 +25,11 @@ Y_UNIT_TEST_SUITE(TOutputStreamFormattingTest) { } Y_UNIT_TEST(TestTime) { - TStringStream ss; - - ss << "[" << Time << "] " - << "qwqw" << TimeHumanReadable << Endl; - } + TStringStream ss; + + ss << "[" << Time << "] " + << "qwqw" << TimeHumanReadable << Endl; + } Y_UNIT_TEST(TestHexReference) { /* diff --git a/util/stream/fwd.h b/util/stream/fwd.h index 307676c6a7..ffe45fd883 100644 --- a/util/stream/fwd.h +++ b/util/stream/fwd.h @@ -45,7 +45,7 @@ struct TEol; template <typename TEndOfToken> class TStreamTokenizer; -enum ETraceLevel: ui8; +enum ETraceLevel: ui8; class IWalkInput; @@ -54,7 +54,7 @@ struct TZLibCompressorError; struct TZLibDecompressorError; namespace ZLib { - enum StreamType: ui8; + enum StreamType: ui8; } class TZLibDecompress; diff --git a/util/stream/hex.cpp b/util/stream/hex.cpp index 1c05330504..4c225eb32a 100644 --- a/util/stream/hex.cpp +++ b/util/stream/hex.cpp @@ -1,6 +1,6 @@ #include "hex.h" -#include "output.h" +#include "output.h" #include <util/string/hex.h> void HexEncode(const void* in, size_t len, IOutputStream& out) { diff --git a/util/stream/hex_ut.cpp b/util/stream/hex_ut.cpp index 5074a0b616..914b30bac8 100644 --- a/util/stream/hex_ut.cpp +++ b/util/stream/hex_ut.cpp @@ -1,7 +1,7 @@ #include "hex.h" #include <library/cpp/testing/unittest/registar.h> -#include "str.h" +#include "str.h" Y_UNIT_TEST_SUITE(THexCodingTest) { void TestImpl(const TString& data) { diff --git a/util/stream/holder.cpp b/util/stream/holder.cpp index f5617eef58..c6ba74b162 100644 --- a/util/stream/holder.cpp +++ b/util/stream/holder.cpp @@ -1 +1 @@ -#include "holder.h" +#include "holder.h" diff --git a/util/stream/holder.h b/util/stream/holder.h index c60a4e510c..0f8b58c8c7 100644 --- a/util/stream/holder.h +++ b/util/stream/holder.h @@ -10,16 +10,16 @@ class IOutputStream; namespace NPrivate { template <class Stream, bool isInput = std::is_base_of<IInputStream, Stream>::value> - struct TStreamBase { + struct TStreamBase { using TType = IInputStream; - }; + }; - template <class Stream> - struct TStreamBase<Stream, false> { + template <class Stream> + struct TStreamBase<Stream, false> { using TType = IOutputStream; - }; + }; -} +} /** * An ownership-gaining wrapper for proxy streams. @@ -33,7 +33,7 @@ namespace NPrivate { * was constructed on top of. */ template <class Base, class StreamBase = typename ::NPrivate::TStreamBase<Base>::TType> -class THoldingStream: private THolder<StreamBase>, public Base { +class THoldingStream: private THolder<StreamBase>, public Base { public: template <class... Args> inline THoldingStream(THolder<StreamBase> stream, Args&&... args) diff --git a/util/stream/input.cpp b/util/stream/input.cpp index 6e8170f2f9..11785b1e9e 100644 --- a/util/stream/input.cpp +++ b/util/stream/input.cpp @@ -1,45 +1,45 @@ -#include "input.h" +#include "input.h" #include "output.h" #include "str.h" - + #include <util/charset/wide.h> -#include <util/memory/tempbuf.h> +#include <util/memory/tempbuf.h> #include <util/generic/string.h> #include <util/generic/yexception.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/string/cast.h> -#include <util/system/compat.h> -#include <util/system/spinlock.h> - +#include <util/system/compat.h> +#include <util/system/spinlock.h> + #include <cstdlib> IInputStream::IInputStream() noexcept = default; - + IInputStream::~IInputStream() = default; - + size_t IInputStream::DoReadTo(TString& st, char to) { - char ch; - - if (!Read(&ch, 1)) { + char ch; + + if (!Read(&ch, 1)) { return 0; - } - - st.clear(); - + } + + st.clear(); + size_t result = 0; - do { + do { ++result; - if (ch == to) { - break; - } - - st += ch; - } while (Read(&ch, 1)); - + if (ch == to) { + break; + } + + st += ch; + } while (Read(&ch, 1)); + return result; -} - +} + ui64 IInputStream::DoReadAll(IOutputStream& out) { TTempBuf buffer; void* ptr = buffer.Data(); @@ -55,22 +55,22 @@ ui64 IInputStream::DoReadAll(IOutputStream& out) { } size_t IInputStream::Load(void* buf_in, size_t len) { - char* buf = (char*)buf_in; - - while (len) { - const size_t ret = Read(buf, len); - - buf += ret; - len -= ret; - - if (ret == 0) { - break; - } - } - - return buf - (char*)buf_in; -} - + char* buf = (char*)buf_in; + + while (len) { + const size_t ret = Read(buf, len); + + buf += ret; + len -= ret; + + if (ret == 0) { + break; + } + } + + return buf - (char*)buf_in; +} + void IInputStream::LoadOrFail(void* buf, size_t len) { const size_t realLen = Load(buf, len); if (Y_UNLIKELY(realLen != len)) { @@ -80,14 +80,14 @@ void IInputStream::LoadOrFail(void* buf, size_t len) { size_t IInputStream::ReadLine(TString& st) { const size_t ret = ReadTo(st, '\n'); - + if (ret && !st.empty() && st.back() == '\r') { - st.pop_back(); - } - + st.pop_back(); + } + return ret; -} - +} + size_t IInputStream::ReadLine(TUtf16String& w) { TString s; size_t result = ReadLine(s); @@ -101,46 +101,46 @@ size_t IInputStream::ReadLine(TUtf16String& w) { TString IInputStream::ReadLine() { TString ret; - - if (!ReadLine(ret)) { + + if (!ReadLine(ret)) { ythrow yexception() << "can not read line from stream"; - } - - return ret; -} - + } + + return ret; +} + TString IInputStream::ReadTo(char ch) { TString ret; - - if (!ReadTo(ret, ch)) { + + if (!ReadTo(ret, ch)) { ythrow yexception() << "can not read from stream"; - } - - return ret; -} - + } + + return ret; +} + size_t IInputStream::Skip(size_t sz) { return DoSkip(sz); } size_t IInputStream::DoSkip(size_t sz) { - if (sz < 128) { - return Load(alloca(sz), sz); - } - + if (sz < 128) { + return Load(alloca(sz), sz); + } + TTempBuf buf; size_t total = 0; - - while (sz) { - const size_t lresult = Read(buf.Data(), Min<size_t>(sz, buf.Size())); - - if (lresult == 0) { + + while (sz) { + const size_t lresult = Read(buf.Data(), Min<size_t>(sz, buf.Size())); + + if (lresult == 0) { return total; - } - + } + total += lresult; sz -= lresult; - } + } return total; } @@ -148,115 +148,115 @@ size_t IInputStream::DoSkip(size_t sz) { TString IInputStream::ReadAll() { TString result; TStringOutput stream(result); - + DoReadAll(stream); return result; } - + ui64 IInputStream::ReadAll(IOutputStream& out) { return DoReadAll(out); -} - +} + ui64 TransferData(IInputStream* in, IOutputStream* out) { return in->ReadAll(*out); } -namespace { +namespace { struct TStdIn: public IInputStream { ~TStdIn() override = default; - + size_t DoRead(void* buf, size_t len) override { - const size_t ret = fread(buf, 1, len, F_); - - if (ret < len && ferror(F_)) { + const size_t ret = fread(buf, 1, len, F_); + + if (ret < len && ferror(F_)) { ythrow TSystemError() << "can not read from stdin"; - } - - return ret; - } - - FILE* F_ = stdin; - }; - -#if defined(_win_) - using TGetLine = TStdIn; -#else - #if defined(_bionic_) - using TGetLineBase = TStdIn; - #else - struct TGetLineBase: public TStdIn { - ~TGetLineBase() override { - free(B_); - } - + } + + return ret; + } + + FILE* F_ = stdin; + }; + +#if defined(_win_) + using TGetLine = TStdIn; +#else + #if defined(_bionic_) + using TGetLineBase = TStdIn; + #else + struct TGetLineBase: public TStdIn { + ~TGetLineBase() override { + free(B_); + } + size_t DoReadTo(TString& st, char ch) override { - auto&& guard = Guard(M_); - - (void)guard; - - const auto r = getdelim(&B_, &L_, ch, F_); - - if (r < 0) { - if (ferror(F_)) { - ythrow TSystemError() << "can not read from stdin"; + auto&& guard = Guard(M_); + + (void)guard; + + const auto r = getdelim(&B_, &L_, ch, F_); + + if (r < 0) { + if (ferror(F_)) { + ythrow TSystemError() << "can not read from stdin"; } - - st.clear(); - - return 0; + + st.clear(); + + return 0; } - - st.AssignNoAlias(B_, r); - + + st.AssignNoAlias(B_, r); + if (st && st.back() == ch) { - st.pop_back(); + st.pop_back(); } - - return r; + + return r; } - TAdaptiveLock M_; - char* B_ = nullptr; - size_t L_ = 0; - }; - #endif - - #if defined(_glibc_) || defined(_cygwin_) + TAdaptiveLock M_; + char* B_ = nullptr; + size_t L_ = 0; + }; + #endif + + #if defined(_glibc_) || defined(_cygwin_) // glibc does not have fgetln - using TGetLine = TGetLineBase; - #else - struct TGetLine: public TGetLineBase { + using TGetLine = TGetLineBase; + #else + struct TGetLine: public TGetLineBase { size_t DoReadTo(TString& st, char ch) override { - if (ch == '\n') { - size_t len = 0; - auto r = fgetln(F_, &len); - - if (r) { - st.AssignNoAlias(r, len); - - if (st && st.back() == '\n') { - st.pop_back(); - } - - return len; - } - } - - return TGetLineBase::DoReadTo(st, ch); - } - }; - #endif -#endif -} - + if (ch == '\n') { + size_t len = 0; + auto r = fgetln(F_, &len); + + if (r) { + st.AssignNoAlias(r, len); + + if (st && st.back() == '\n') { + st.pop_back(); + } + + return len; + } + } + + return TGetLineBase::DoReadTo(st, ch); + } + }; + #endif +#endif +} + IInputStream& NPrivate::StdInStream() noexcept { - return *SingletonWithPriority<TGetLine, 4>(); -} + return *SingletonWithPriority<TGetLine, 4>(); +} // implementation of >> operator -// helper functions +// helper functions static inline bool IsStdDelimiter(char c) { return (c == '\0') || (c == ' ') || (c == '\r') || (c == '\n') || (c == '\t'); @@ -295,13 +295,13 @@ void In<TUtf16String>(IInputStream& i, TUtf16String& w) { } } -// specialization for char types +// specialization for char types -#define SPEC_FOR_CHAR(T) \ - template <> \ +#define SPEC_FOR_CHAR(T) \ + template <> \ void In<T>(IInputStream & i, T & t) { \ - i.ReadChar((char&)t); \ - } + i.ReadChar((char&)t); \ + } SPEC_FOR_CHAR(char) SPEC_FOR_CHAR(unsigned char) @@ -311,22 +311,22 @@ SPEC_FOR_CHAR(signed char) // specialization for number types -#define SPEC_FOR_NUMBER(T) \ - template <> \ +#define SPEC_FOR_NUMBER(T) \ + template <> \ void In<T>(IInputStream & i, T & t) { \ - char buf[128]; \ - size_t pos = 0; \ - while (i.ReadChar(buf[0])) { \ - if (!IsStdDelimiter(buf[0])) { \ - ++pos; \ - break; \ - } \ - } \ - while (i.ReadChar(buf[pos]) && !IsStdDelimiter(buf[pos]) && pos < 127) { \ - ++pos; \ - } \ - t = FromString<T, char>(buf, pos); \ - } + char buf[128]; \ + size_t pos = 0; \ + while (i.ReadChar(buf[0])) { \ + if (!IsStdDelimiter(buf[0])) { \ + ++pos; \ + break; \ + } \ + } \ + while (i.ReadChar(buf[pos]) && !IsStdDelimiter(buf[pos]) && pos < 127) { \ + ++pos; \ + } \ + t = FromString<T, char>(buf, pos); \ + } SPEC_FOR_NUMBER(signed short) SPEC_FOR_NUMBER(signed int) diff --git a/util/stream/input.h b/util/stream/input.h index f0d5807ed2..59e3c08d80 100644 --- a/util/stream/input.h +++ b/util/stream/input.h @@ -1,9 +1,9 @@ #pragma once - + #include <util/generic/fwd.h> #include <util/generic/noncopyable.h> -#include <util/system/defaults.h> - +#include <util/system/defaults.h> + class IOutputStream; /** @@ -15,16 +15,16 @@ class IOutputStream; * Abstract input stream. */ class IInputStream: public TNonCopyable { -public: +public: IInputStream() noexcept; virtual ~IInputStream(); - + IInputStream(IInputStream&&) noexcept { - } + } IInputStream& operator=(IInputStream&&) noexcept { return *this; - } + } /** * Reads some data from the stream. Note that this function might read less @@ -36,13 +36,13 @@ public: * @returns Number of bytes that were actually read. * A return value of zero signals end of stream. */ - inline size_t Read(void* buf, size_t len) { - if (len == 0) { - return 0; - } - - return DoRead(buf, len); - } + inline size_t Read(void* buf, size_t len) { + if (len == 0) { + return 0; + } + + return DoRead(buf, len); + } /** * Reads one character from the stream. @@ -52,10 +52,10 @@ public: * A return value of false signals the end * of stream. */ - inline bool ReadChar(char& c) { - return DoRead(&c, 1) > 0; - } - + inline bool ReadChar(char& c) { + return DoRead(&c, 1) > 0; + } + /** * Reads all characters from the stream until the given character is * encountered, and stores them into the given string. The character itself @@ -67,8 +67,8 @@ public: * A return value of zero signals end of stream. */ inline size_t ReadTo(TString& st, char ch) { - return DoReadTo(st, ch); - } + return DoReadTo(st, ch); + } /** * Reads the requested amount of data from the stream. Unlike `Read`, this @@ -81,7 +81,7 @@ public: * A return value different from `len` * signals end of stream. */ - size_t Load(void* buf, size_t len); + size_t Load(void* buf, size_t len); /** * Reads the requested amount of data from the stream, or fails with an @@ -91,7 +91,7 @@ public: * @param len Number of bytes to read. * @see Load */ - void LoadOrFail(void* buf, size_t len); + void LoadOrFail(void* buf, size_t len); /** * Reads all data from this stream and returns it as a string. @@ -144,7 +144,7 @@ public: * A return value of zero signals end of stream. */ size_t ReadLine(TString& st); - + /** * Reads UTF8 encoded characters from the stream the first occurrence of '\n', * converts them into wide ones, and stores into provided string. Also handles @@ -156,7 +156,7 @@ public: * A return value of zero signals end of stream. */ size_t ReadLine(TUtf16String& w); - + /** * Skips some data from the stream without reading / copying it. Note that * this function might skip less data than what was requested. @@ -165,10 +165,10 @@ public: * @returns Number of bytes that were actually skipped. * A return value of zero signals end of stream. */ - size_t Skip(size_t len); - -protected: - /** + size_t Skip(size_t len); + +protected: + /** * Reads some data from the stream. Might read less data than what was * requested. * @@ -178,9 +178,9 @@ protected: * A return value of zero signals end of stream. * @throws yexception If IO error occurs. */ - virtual size_t DoRead(void* buf, size_t len) = 0; + virtual size_t DoRead(void* buf, size_t len) = 0; - /** + /** * Skips some data from the stream. Might skip less data than what was * requested. * @@ -189,9 +189,9 @@ protected: * A return value of zero signals end of stream. * @throws yexception If IO error occurs. */ - virtual size_t DoSkip(size_t len); + virtual size_t DoSkip(size_t len); - /** + /** * Reads all characters from the stream until the given character is * encountered, and stores them into the given string. The character itself * is read from the stream, but not stored in the string. @@ -216,8 +216,8 @@ protected: * @throws yexception If IO error occurs. */ virtual ui64 DoReadAll(IOutputStream& out); -}; - +}; + /** * Transfers all data from the given input stream into the given output stream. * @@ -269,5 +269,5 @@ namespace NPrivate { * Standard input stream. */ #define Cin (::NPrivate::StdInStream()) - + /** @} */ diff --git a/util/stream/input_ut.cpp b/util/stream/input_ut.cpp index 4a93f5458e..ca4188561f 100644 --- a/util/stream/input_ut.cpp +++ b/util/stream/input_ut.cpp @@ -7,15 +7,15 @@ #include <util/system/yassert.h> #ifdef _win_ - #include <io.h> + #include <io.h> #endif class TMockStdIn { public: TMockStdIn() : StdInCopy_(dup(0)) - { - } + { + } ~TMockStdIn() { close(StdInCopy_); } @@ -34,19 +34,19 @@ public: func(); Cin.ReadAll(); dup2(StdInCopy_, 0); - clearerr(stdin); + clearerr(stdin); } - + private: int StdInCopy_; }; class TNoInput: public IInputStream { public: - TNoInput(ui64 size) - : Size_(size) - { - } + TNoInput(ui64 size) + : Size_(size) + { + } protected: size_t DoRead(void*, size_t len) override { @@ -65,23 +65,23 @@ public: protected: void DoWrite(const void*, size_t) override { - } + } }; class TSimpleStringInput: public IInputStream { public: TSimpleStringInput(const TString& string) - : String_(string) - { - } + : String_(string) + { + } protected: size_t DoRead(void* buf, size_t len) override { Y_ASSERT(len != 0); - if (String_.empty()) { + if (String_.empty()) { return 0; - } + } *static_cast<char*>(buf) = String_[0]; String_.remove(0, 1); @@ -137,7 +137,7 @@ Y_UNIT_TEST_SUITE(TInputTest) { {{"\n", '\n'}, ""}, {{"\n\t", '\t'}, "\n"}, {{"\t\n", '\n'}, "\t"}, - {{"a\tb\n", '\t'}, "a"}}; + {{"a\tb\n", '\t'}, "a"}}; TMockStdIn stdIn; @@ -147,11 +147,11 @@ Y_UNIT_TEST_SUITE(TInputTest) { const TStringBuf expectedValue = testPair.second; stdIn.ForInput(text, - [=] { + [=] { TString value; - Cin.ReadTo(value, delim); - UNIT_ASSERT_VALUES_EQUAL(value, expectedValue); - }); + Cin.ReadTo(value, delim); + UNIT_ASSERT_VALUES_EQUAL(value, expectedValue); + }); } } } diff --git a/util/stream/ios_ut.cpp b/util/stream/ios_ut.cpp index 139f4296e5..ca1dd45e7f 100644 --- a/util/stream/ios_ut.cpp +++ b/util/stream/ios_ut.cpp @@ -1,57 +1,57 @@ #include "output.h" -#include "tokenizer.h" +#include "tokenizer.h" #include "buffer.h" #include "buffered.h" #include "walk.h" - + #include <library/cpp/testing/unittest/registar.h> - -#include <util/string/cast.h> + +#include <util/string/cast.h> #include <util/memory/tempbuf.h> -#include <util/charset/wide.h> - -#include <string> -#include <iostream> - -class TStreamsTest: public TTestBase { - UNIT_TEST_SUITE(TStreamsTest); - UNIT_TEST(TestGenericRead); - UNIT_TEST(TestGenericWrite); - UNIT_TEST(TestReadLine); - UNIT_TEST(TestMemoryStream); - UNIT_TEST(TestBufferedIO); - UNIT_TEST(TestBufferStream); - UNIT_TEST(TestStringStream); - UNIT_TEST(TestWtrokaInput); +#include <util/charset/wide.h> + +#include <string> +#include <iostream> + +class TStreamsTest: public TTestBase { + UNIT_TEST_SUITE(TStreamsTest); + UNIT_TEST(TestGenericRead); + UNIT_TEST(TestGenericWrite); + UNIT_TEST(TestReadLine); + UNIT_TEST(TestMemoryStream); + UNIT_TEST(TestBufferedIO); + UNIT_TEST(TestBufferStream); + UNIT_TEST(TestStringStream); + UNIT_TEST(TestWtrokaInput); UNIT_TEST(TestStrokaInput); UNIT_TEST(TestReadTo); - UNIT_TEST(TestWtrokaOutput); - UNIT_TEST(TestIStreamOperators); + UNIT_TEST(TestWtrokaOutput); + UNIT_TEST(TestIStreamOperators); UNIT_TEST(TestWchar16Output); UNIT_TEST(TestWchar32Output); UNIT_TEST(TestUtf16StingOutputByChars); - UNIT_TEST_SUITE_END(); - -public: - void TestGenericRead(); - void TestGenericWrite(); - void TestReadLine(); - void TestMemoryStream(); - void TestBufferedIO(); - void TestBufferStream(); - void TestStringStream(); - void TestWtrokaInput(); + UNIT_TEST_SUITE_END(); + +public: + void TestGenericRead(); + void TestGenericWrite(); + void TestReadLine(); + void TestMemoryStream(); + void TestBufferedIO(); + void TestBufferStream(); + void TestStringStream(); + void TestWtrokaInput(); void TestStrokaInput(); - void TestWtrokaOutput(); - void TestIStreamOperators(); + void TestWtrokaOutput(); + void TestIStreamOperators(); void TestReadTo(); void TestWchar16Output(); void TestWchar32Output(); void TestUtf16StingOutputByChars(); -}; - -UNIT_TEST_SUITE_REGISTRATION(TStreamsTest); - +}; + +UNIT_TEST_SUITE_REGISTRATION(TStreamsTest); + void TStreamsTest::TestIStreamOperators() { TString data("first line\r\nsecond\t\xd1\x82\xd0\xb5\xd1\x81\xd1\x82 line\r\n 1 -4 59 4320000000009999999 c\n -1.5 1e-110"); TStringInput si(data); @@ -86,22 +86,22 @@ void TStreamsTest::TestIStreamOperators() { UNIT_ASSERT_EQUAL(f2, 1e-110); } -void TStreamsTest::TestStringStream() { - TStringStream s; - - s << "qw\r\n1234" - << "\n" - << 34; - - UNIT_ASSERT_EQUAL(s.ReadLine(), "qw"); - UNIT_ASSERT_EQUAL(s.ReadLine(), "1234"); - - s << "\r\n" - << 123.1; - - UNIT_ASSERT_EQUAL(s.ReadLine(), "34"); +void TStreamsTest::TestStringStream() { + TStringStream s; + + s << "qw\r\n1234" + << "\n" + << 34; + + UNIT_ASSERT_EQUAL(s.ReadLine(), "qw"); + UNIT_ASSERT_EQUAL(s.ReadLine(), "1234"); + + s << "\r\n" + << 123.1; + + UNIT_ASSERT_EQUAL(s.ReadLine(), "34"); UNIT_ASSERT_EQUAL(s.ReadLine(), "123.1"); - + UNIT_ASSERT_EQUAL(s.Str(), "qw\r\n1234\n34\r\n123.1"); // Test stream copying @@ -117,148 +117,148 @@ void TStreamsTest::TestStringStream() { ss = s; s << "... and some trash"; UNIT_ASSERT_EQUAL(ss.Str(), "qw\r\n1234\n34\r\n123.1-666-13"); -} - -void TStreamsTest::TestGenericRead() { +} + +void TStreamsTest::TestGenericRead() { TString s("1234567890"); - TStringInput si(s); - char buf[1024]; - - UNIT_ASSERT_EQUAL(si.Read(buf, 6), 6); - UNIT_ASSERT_EQUAL(memcmp(buf, "123456", 6), 0); - UNIT_ASSERT_EQUAL(si.Read(buf, 6), 4); - UNIT_ASSERT_EQUAL(memcmp(buf, "7890", 4), 0); -} - -void TStreamsTest::TestGenericWrite() { + TStringInput si(s); + char buf[1024]; + + UNIT_ASSERT_EQUAL(si.Read(buf, 6), 6); + UNIT_ASSERT_EQUAL(memcmp(buf, "123456", 6), 0); + UNIT_ASSERT_EQUAL(si.Read(buf, 6), 4); + UNIT_ASSERT_EQUAL(memcmp(buf, "7890", 4), 0); +} + +void TStreamsTest::TestGenericWrite() { TString s; - TStringOutput so(s); - - so.Write("123456", 6); - so.Write("7890", 4); - - UNIT_ASSERT_EQUAL(s, "1234567890"); -} - -void TStreamsTest::TestReadLine() { + TStringOutput so(s); + + so.Write("123456", 6); + so.Write("7890", 4); + + UNIT_ASSERT_EQUAL(s, "1234567890"); +} + +void TStreamsTest::TestReadLine() { TString data("1234\r\n5678\nqw"); - TStringInput si(data); - - UNIT_ASSERT_EQUAL(si.ReadLine(), "1234"); - UNIT_ASSERT_EQUAL(si.ReadLine(), "5678"); - UNIT_ASSERT_EQUAL(si.ReadLine(), "qw"); -} - -void TStreamsTest::TestMemoryStream() { - char buf[1024]; - TMemoryOutput mo(buf, sizeof(buf)); - bool ehandled = false; - - try { - for (size_t i = 0; i < sizeof(buf) + 1; ++i) { - mo.Write(i % 127); - } - } catch (...) { - ehandled = true; - } - - UNIT_ASSERT_EQUAL(ehandled, true); - - for (size_t i = 0; i < sizeof(buf); ++i) { - UNIT_ASSERT_EQUAL(buf[i], (char)(i % 127)); - } -} - + TStringInput si(data); + + UNIT_ASSERT_EQUAL(si.ReadLine(), "1234"); + UNIT_ASSERT_EQUAL(si.ReadLine(), "5678"); + UNIT_ASSERT_EQUAL(si.ReadLine(), "qw"); +} + +void TStreamsTest::TestMemoryStream() { + char buf[1024]; + TMemoryOutput mo(buf, sizeof(buf)); + bool ehandled = false; + + try { + for (size_t i = 0; i < sizeof(buf) + 1; ++i) { + mo.Write(i % 127); + } + } catch (...) { + ehandled = true; + } + + UNIT_ASSERT_EQUAL(ehandled, true); + + for (size_t i = 0; i < sizeof(buf); ++i) { + UNIT_ASSERT_EQUAL(buf[i], (char)(i % 127)); + } +} + class TMyStringOutput: public IOutputStream { -public: +public: inline TMyStringOutput(TString& s, size_t buflen) noexcept - : S_(s) - , BufLen_(buflen) - { - } - + : S_(s) + , BufLen_(buflen) + { + } + ~TMyStringOutput() override = default; - + void DoWrite(const void* data, size_t len) override { - S_.Write(data, len); - UNIT_ASSERT(len < BufLen_ || ((len % BufLen_) == 0)); - } - + S_.Write(data, len); + UNIT_ASSERT(len < BufLen_ || ((len % BufLen_) == 0)); + } + void DoWriteV(const TPart* p, size_t count) override { TString s; - - for (size_t i = 0; i < count; ++i) { - s.append((const char*)p[i].buf, p[i].len); - } - + + for (size_t i = 0; i < count; ++i) { + s.append((const char*)p[i].buf, p[i].len); + } + DoWrite(s.data(), s.size()); - } - -private: - TStringOutput S_; - const size_t BufLen_; -}; - -void TStreamsTest::TestBufferedIO() { + } + +private: + TStringOutput S_; + const size_t BufLen_; +}; + +void TStreamsTest::TestBufferedIO() { TString s; - - { - const size_t buflen = 7; - TBuffered<TMyStringOutput> bo(buflen, s, buflen); - - for (size_t i = 0; i < 1000; ++i) { + + { + const size_t buflen = 7; + TBuffered<TMyStringOutput> bo(buflen, s, buflen); + + for (size_t i = 0; i < 1000; ++i) { TString str(" "); - str += ToString(i % 10); - + str += ToString(i % 10); + bo.Write(str.data(), str.size()); - } - - bo.Finish(); - } - + } + + bo.Finish(); + } + UNIT_ASSERT_EQUAL(s.size(), 2000); - - { - const size_t buflen = 11; - TBuffered<TStringInput> bi(buflen, s); - - for (size_t i = 0; i < 1000; ++i) { + + { + const size_t buflen = 11; + TBuffered<TStringInput> bi(buflen, s); + + for (size_t i = 0; i < 1000; ++i) { TString str(" "); - str += ToString(i % 10); - - char buf[3]; - - UNIT_ASSERT_EQUAL(bi.Load(buf, 2), 2); - - buf[2] = 0; - - UNIT_ASSERT_EQUAL(str, buf); - } - } - - s.clear(); - - { - const size_t buflen = 13; - TBuffered<TMyStringOutput> bo(buflen, s, buflen); + str += ToString(i % 10); + + char buf[3]; + + UNIT_ASSERT_EQUAL(bi.Load(buf, 2), 2); + + buf[2] = 0; + + UNIT_ASSERT_EQUAL(str, buf); + } + } + + s.clear(); + + { + const size_t buflen = 13; + TBuffered<TMyStringOutput> bo(buflen, s, buflen); TString f = "1234567890"; - - for (size_t i = 0; i < 10; ++i) { - f += f; - } - - for (size_t i = 0; i < 1000; ++i) { + + for (size_t i = 0; i < 10; ++i) { + f += f; + } + + for (size_t i = 0; i < 1000; ++i) { bo.Write(f.data(), i); - } - - bo.Finish(); - } -} + } + + bo.Finish(); + } +} -void TStreamsTest::TestBufferStream() { +void TStreamsTest::TestBufferStream() { TBufferStream stream; TString s = "test"; - + stream.Write(s.data(), s.size()); char buf[5]; size_t bytesRead = stream.Read(buf, 4); @@ -269,7 +269,7 @@ void TStreamsTest::TestBufferStream() { bytesRead = stream.Read(buf, 2); UNIT_ASSERT_EQUAL(2, bytesRead); UNIT_ASSERT_EQUAL(0, strncmp("te", buf, 2)); - + bytesRead = stream.Read(buf, 2); UNIT_ASSERT_EQUAL(2, bytesRead); UNIT_ASSERT_EQUAL(0, strncmp("st", buf, 2)); @@ -330,70 +330,70 @@ namespace { "", "\rone two", "123", - "\t\r "}; + "\t\r "}; void TestStreamReadTo1(IInputStream& input, const char* comment) { TString tmp; - input.ReadTo(tmp, 'c'); - UNIT_ASSERT_VALUES_EQUAL_C(tmp, "111a222b333", comment); + input.ReadTo(tmp, 'c'); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "111a222b333", comment); - char tmp2; - input.Read(&tmp2, 1); - UNIT_ASSERT_VALUES_EQUAL_C(tmp2, '4', comment); + char tmp2; + input.Read(&tmp2, 1); + UNIT_ASSERT_VALUES_EQUAL_C(tmp2, '4', comment); - input.ReadTo(tmp, '6'); - UNIT_ASSERT_VALUES_EQUAL_C(tmp, "44d555e", comment); + input.ReadTo(tmp, '6'); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "44d555e", comment); - tmp = input.ReadAll(); - UNIT_ASSERT_VALUES_EQUAL_C(tmp, "66f", comment); - } + tmp = input.ReadAll(); + UNIT_ASSERT_VALUES_EQUAL_C(tmp, "66f", comment); + } void TestStreamReadTo2(IInputStream& input, const char* comment) { TString s; - size_t i = 0; - while (input.ReadLine(s)) { - UNIT_ASSERT_C(i < Y_ARRAY_SIZE(Expected), comment); - UNIT_ASSERT_VALUES_EQUAL_C(s, Expected[i], comment); - ++i; + size_t i = 0; + while (input.ReadLine(s)) { + UNIT_ASSERT_C(i < Y_ARRAY_SIZE(Expected), comment); + UNIT_ASSERT_VALUES_EQUAL_C(s, Expected[i], comment); + ++i; } - } + } void TestStreamReadTo3(IInputStream& input, const char* comment) { - UNIT_ASSERT_VALUES_EQUAL_C(input.ReadLine(), "111a222b333c444d555e666f", comment); - } + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadLine(), "111a222b333c444d555e666f", comment); + } void TestStreamReadTo4(IInputStream& input, const char* comment) { - UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "one", comment); - UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "two", comment); - UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "three", comment); - } + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "one", comment); + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "two", comment); + UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "three", comment); + } void TestStrokaInput(IInputStream& input, const char* comment) { TString line; - ui32 i = 0; - TInstant start = Now(); - while (input.ReadLine(line)) { - ++i; + ui32 i = 0; + TInstant start = Now(); + while (input.ReadLine(line)) { + ++i; } - Cout << comment << ":" << (Now() - start).SecondsFloat() << Endl; - UNIT_ASSERT_VALUES_EQUAL(i, 100000); - } + Cout << comment << ":" << (Now() - start).SecondsFloat() << Endl; + UNIT_ASSERT_VALUES_EQUAL(i, 100000); + } - template <class T> + template <class T> void TestStreamReadTo(const TString& text, T test) { - TStringInput is(text); - test(is, "TStringInput"); + TStringInput is(text); + test(is, "TStringInput"); TMemoryInput mi(text.data(), text.size()); - test(mi, "TMemoryInput"); + test(mi, "TMemoryInput"); TBuffer b(text.data(), text.size()); - TBufferInput bi(b); - test(bi, "TBufferInput"); - TStringInput slave(text); - TBufferedInput bdi(&slave); - test(bdi, "TBufferedInput"); + TBufferInput bi(b); + test(bi, "TBufferInput"); + TStringInput slave(text); + TBufferedInput bdi(&slave); + test(bdi, "TBufferedInput"); TVector<TString> lst(1, text); - TStringListInput sli(lst); + TStringListInput sli(lst); test(sli, "IWalkInput"); - } + } } void TStreamsTest::TestReadTo() { @@ -412,18 +412,18 @@ void TStreamsTest::TestStrokaInput() { s.append(d.data(), d.size()); s.append('\n'); } - TestStreamReadTo(s, ::TestStrokaInput); + TestStreamReadTo(s, ::TestStrokaInput); } void TStreamsTest::TestWtrokaInput() { const TString s(Text); TStringInput is(s); TUtf16String w; - size_t i = 0; + size_t i = 0; while (is.ReadLine(w)) { UNIT_ASSERT(i < Y_ARRAY_SIZE(Expected)); - UNIT_ASSERT_VALUES_EQUAL(w, UTF8ToWide(Expected[i])); + UNIT_ASSERT_VALUES_EQUAL(w, UTF8ToWide(Expected[i])); ++i; } @@ -438,14 +438,14 @@ void TStreamsTest::TestWtrokaOutput() { TUtf16String w = UTF8ToWide(Expected[i]); os << w; - - if (i == 1 || i == 5 || i == 8) { + + if (i == 1 || i == 5 || i == 8) { os << '\r'; - } - - if (i < n - 1) { + } + + if (i < n - 1) { os << '\n'; - } + } } UNIT_ASSERT(s == Text); @@ -455,28 +455,28 @@ void TStreamsTest::TestWchar16Output() { TString s; TStringOutput os(s); os << wchar16(97); // latin a - os << u'\u044E'; // cyrillic ю + os << u'\u044E'; // cyrillic ю os << u'я'; os << wchar16(0xD801); // high surrogate is printed as replacement character U+FFFD os << u'b'; - UNIT_ASSERT_VALUES_EQUAL(s, "aюя" - "\xEF\xBF\xBD" - "b"); + UNIT_ASSERT_VALUES_EQUAL(s, "aюя" + "\xEF\xBF\xBD" + "b"); } void TStreamsTest::TestWchar32Output() { TString s; TStringOutput os(s); os << wchar32(97); // latin a - os << U'\u044E'; // cyrillic ю + os << U'\u044E'; // cyrillic ю os << U'я'; os << U'\U0001F600'; // grinning face os << u'b'; - UNIT_ASSERT_VALUES_EQUAL(s, "aюя" - "\xF0\x9F\x98\x80" - "b"); + UNIT_ASSERT_VALUES_EQUAL(s, "aюя" + "\xF0\x9F\x98\x80" + "b"); } void TStreamsTest::TestUtf16StingOutputByChars() { diff --git a/util/stream/labeled.cpp b/util/stream/labeled.cpp index 56a886ca30..6d709c302e 100644 --- a/util/stream/labeled.cpp +++ b/util/stream/labeled.cpp @@ -1 +1 @@ -#include "labeled.h" +#include "labeled.h" diff --git a/util/stream/labeled.h b/util/stream/labeled.h index 2cc539d241..75419357a5 100644 --- a/util/stream/labeled.h +++ b/util/stream/labeled.h @@ -15,5 +15,5 @@ */ #define LabeledOutput(...) "" Y_PASS_VA_ARGS(Y_MAP_ARGS_WITH_LAST(__LABELED_OUTPUT_NONLAST__, __LABELED_OUTPUT_IMPL__, __VA_ARGS__)) -#define __LABELED_OUTPUT_IMPL__(x) << #x " = " << (x) +#define __LABELED_OUTPUT_IMPL__(x) << #x " = " << (x) #define __LABELED_OUTPUT_NONLAST__(x) __LABELED_OUTPUT_IMPL__(x) << ", " diff --git a/util/stream/length.h b/util/stream/length.h index 4d508ae24d..9ee60ae203 100644 --- a/util/stream/length.h +++ b/util/stream/length.h @@ -1,10 +1,10 @@ #pragma once - -#include "input.h" + +#include "input.h" #include "output.h" - -#include <util/generic/utility.h> - + +#include <util/generic/utility.h> + /** * Proxy input stream that can read a limited number of characters from a slave * stream. @@ -15,35 +15,35 @@ class TLengthLimitedInput: public IInputStream { public: inline TLengthLimitedInput(IInputStream* slave, ui64 length) noexcept - : Slave_(slave) - , Length_(length) - { + : Slave_(slave) + , Length_(length) + { } - + ~TLengthLimitedInput() override = default; - + inline ui64 Left() const noexcept { return Length_; } - + private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; - + private: IInputStream* Slave_; ui64 Length_; }; - + /** * Proxy input stream that counts the number of characters read. */ class TCountingInput: public IInputStream { public: inline TCountingInput(IInputStream* slave) noexcept - : Slave_(slave) - , Count_() - { + : Slave_(slave) + , Count_() + { } ~TCountingInput() override = default; @@ -65,7 +65,7 @@ private: private: IInputStream* Slave_; ui64 Count_; -}; +}; /** * Proxy output stream that counts the number of characters written. @@ -73,9 +73,9 @@ private: class TCountingOutput: public IOutputStream { public: inline TCountingOutput(IOutputStream* slave) noexcept - : Slave_(slave) - , Count_() - { + : Slave_(slave) + , Count_() + { } ~TCountingOutput() override = default; diff --git a/util/stream/mem.cpp b/util/stream/mem.cpp index 22a3339e27..b2b23d789f 100644 --- a/util/stream/mem.cpp +++ b/util/stream/mem.cpp @@ -1,27 +1,27 @@ -#include "mem.h" - +#include "mem.h" + #include <util/generic/yexception.h> - + TMemoryInput::TMemoryInput() noexcept - : Buf_(nullptr) - , Len_(0) -{ -} - + : Buf_(nullptr) + , Len_(0) +{ +} + TMemoryInput::TMemoryInput(const void* buf, size_t len) noexcept - : Buf_((const char*)buf) - , Len_(len) -{ -} - + : Buf_((const char*)buf) + , Len_(len) +{ +} + TMemoryInput::TMemoryInput(const TStringBuf buf) noexcept : Buf_(buf.data()) , Len_(buf.size()) -{ +{ } TMemoryInput::~TMemoryInput() = default; - + size_t TMemoryInput::DoNext(const void** ptr, size_t len) { len = Min(Len_, len); @@ -29,15 +29,15 @@ size_t TMemoryInput::DoNext(const void** ptr, size_t len) { Len_ -= len; Buf_ += len; return len; -} - +} + void TMemoryInput::DoUndo(size_t len) { - Len_ += len; - Buf_ -= len; -} - + Len_ += len; + Buf_ -= len; +} + TMemoryOutput::~TMemoryOutput() = default; - + size_t TMemoryOutput::DoNext(void** ptr) { Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted")); *ptr = Buf_; @@ -51,13 +51,13 @@ void TMemoryOutput::DoUndo(size_t len) { Buf_ -= len; } -void TMemoryOutput::DoWrite(const void* buf, size_t len) { +void TMemoryOutput::DoWrite(const void* buf, size_t len) { char* end = Buf_ + len; Y_ENSURE(end <= End_, TStringBuf("memory output stream exhausted")); - - memcpy(Buf_, buf, len); + + memcpy(Buf_, buf, len); Buf_ = end; -} +} void TMemoryOutput::DoWriteC(char c) { Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted")); diff --git a/util/stream/mem.h b/util/stream/mem.h index 18a5d46772..9692e29d69 100644 --- a/util/stream/mem.h +++ b/util/stream/mem.h @@ -1,8 +1,8 @@ #pragma once - -#include "zerocopy.h" + +#include "zerocopy.h" #include "zerocopy_output.h" - + #include <util/generic/strbuf.h> /** @@ -28,7 +28,7 @@ public: TMemoryInput(const void* buf, size_t len) noexcept; explicit TMemoryInput(const TStringBuf buf) noexcept; ~TMemoryInput() override; - + TMemoryInput(const TMemoryInput& other) noexcept : IZeroCopyInputFastReadTo() , Buf_(other.Buf_) @@ -60,21 +60,21 @@ public: Buf_ = (const char*)buf; Len_ = len; } - + /** * @returns Whether there is more data in the stream. */ bool Exhausted() const noexcept { return !Avail(); } - + /** * @returns Number of bytes available in the stream. */ size_t Avail() const noexcept { return Len_; } - + /** * @returns Current read position in the memory block * used by this stream. @@ -93,18 +93,18 @@ public: Len_ = stream->Next(&Buf_); if (!Len_) { Reset(nullptr, 0); - } + } } - + private: size_t DoNext(const void** ptr, size_t len) override; void DoUndo(size_t len) override; - + private: const char* Buf_; size_t Len_; -}; - +}; + /** * Output stream that writes data to a memory block. */ @@ -119,12 +119,12 @@ public: * @param len Size of the memory block. */ TMemoryOutput(void* buf, size_t len) noexcept - : Buf_(static_cast<char*>(buf)) - , End_(Buf_ + len) - { - } + : Buf_(static_cast<char*>(buf)) + , End_(Buf_ + len) + { + } ~TMemoryOutput() override; - + TMemoryOutput(TMemoryOutput&&) noexcept = default; TMemoryOutput& operator=(TMemoryOutput&&) noexcept = default; @@ -137,10 +137,10 @@ public: * @param len Size of the new memory block. */ inline void Reset(void* buf, size_t len) noexcept { - Buf_ = static_cast<char*>(buf); + Buf_ = static_cast<char*>(buf); End_ = Buf_ + len; } - + /** * @returns Whether there is more space in the * stream for writing. @@ -148,7 +148,7 @@ public: inline bool Exhausted() const noexcept { return !Avail(); } - + /** * @returns Number of bytes available for writing * in the stream. @@ -156,7 +156,7 @@ public: inline size_t Avail() const noexcept { return End_ - Buf_; } - + /** * @returns Current write position in the memory block * used by this stream. @@ -164,7 +164,7 @@ public: inline char* Buf() const noexcept { return Buf_; } - + /** * @returns Pointer to the end of the memory block * used by this stream. @@ -178,25 +178,25 @@ private: void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; - + protected: char* Buf_; char* End_; -}; - +}; + /** * Memory output stream that supports changing the position of the * write pointer. * * @see TMemoryOutput */ -class TMemoryWriteBuffer: public TMemoryOutput { +class TMemoryWriteBuffer: public TMemoryOutput { public: TMemoryWriteBuffer(void* buf, size_t len) : TMemoryOutput(buf, len) , Beg_(Buf_) - { - } + { + } void Reset(void* buf, size_t len) { TMemoryOutput::Reset(buf, len); diff --git a/util/stream/mem_ut.cpp b/util/stream/mem_ut.cpp index f388ae66ac..04bb6adce0 100644 --- a/util/stream/mem_ut.cpp +++ b/util/stream/mem_ut.cpp @@ -53,17 +53,17 @@ Y_UNIT_TEST_SUITE(TestMemIO) { Y_UNIT_TEST(Write) { char buffer[20]; TMemoryOutput output(buffer, sizeof(buffer)); - output << "1" - << "22" - << "333" - << "4444" - << "55555"; + output << "1" + << "22" + << "333" + << "4444" + << "55555"; - const char* const result = "1" - "22" - "333" - "4444" - "55555"; + const char* const result = "1" + "22" + "333" + "4444" + "55555"; UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); } diff --git a/util/stream/multi.cpp b/util/stream/multi.cpp index b2354298a0..9ec27d9cfe 100644 --- a/util/stream/multi.cpp +++ b/util/stream/multi.cpp @@ -1,26 +1,26 @@ -#include "null.h" -#include "multi.h" - +#include "null.h" +#include "multi.h" + TMultiInput::TMultiInput(IInputStream* f, IInputStream* s) noexcept - : C_(f) - , N_(s) -{ -} - + : C_(f) + , N_(s) +{ +} + TMultiInput::~TMultiInput() = default; - -size_t TMultiInput::DoRead(void* buf, size_t len) { - const size_t ret = C_->Read(buf, len); - - if (ret) { - return ret; - } - - C_ = N_; - N_ = &Cnull; - - return C_->Read(buf, len); -} + +size_t TMultiInput::DoRead(void* buf, size_t len) { + const size_t ret = C_->Read(buf, len); + + if (ret) { + return ret; + } + + C_ = N_; + N_ = &Cnull; + + return C_->Read(buf, len); +} size_t TMultiInput::DoReadTo(TString& st, char ch) { size_t ret = C_->ReadTo(st, ch); diff --git a/util/stream/multi.h b/util/stream/multi.h index 8bfd462d99..dc969530f4 100644 --- a/util/stream/multi.h +++ b/util/stream/multi.h @@ -1,30 +1,30 @@ #pragma once - -#include "input.h" - + +#include "input.h" + /** * @addtogroup Streams_Multi * @{ - */ + */ /** * A proxy input stream that concatenates two slave streams into one. */ class TMultiInput: public IInputStream { -public: +public: TMultiInput(IInputStream* f, IInputStream* s) noexcept; ~TMultiInput() override; - -private: + +private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; size_t DoReadTo(TString& st, char ch) override; - -private: + +private: IInputStream* C_; IInputStream* N_; -}; - +}; + /** * See also "util/stream/tee.h" for multi output. */ diff --git a/util/stream/null.cpp b/util/stream/null.cpp index 4e8b298145..22c731fd84 100644 --- a/util/stream/null.cpp +++ b/util/stream/null.cpp @@ -1,18 +1,18 @@ -#include "null.h" - -#include <util/generic/singleton.h> - +#include "null.h" + +#include <util/generic/singleton.h> + TNullIO& NPrivate::StdNullStream() noexcept { return *SingletonWithPriority<TNullIO, 4>(); -} - +} + TNullInput::TNullInput() noexcept { } TNullInput::~TNullInput() = default; size_t TNullInput::DoRead(void*, size_t) { - return 0; + return 0; } size_t TNullInput::DoSkip(size_t) { @@ -21,13 +21,13 @@ size_t TNullInput::DoSkip(size_t) { size_t TNullInput::DoNext(const void**, size_t) { return 0; -} - +} + TNullOutput::TNullOutput() noexcept = default; TNullOutput::~TNullOutput() = default; -void TNullOutput::DoWrite(const void* /*buf*/, size_t /*len*/) { +void TNullOutput::DoWrite(const void* /*buf*/, size_t /*len*/) { } TNullIO::TNullIO() noexcept { diff --git a/util/stream/null.h b/util/stream/null.h index 8c335a9a78..6d80dd3f09 100644 --- a/util/stream/null.h +++ b/util/stream/null.h @@ -1,8 +1,8 @@ #pragma once - + #include "zerocopy.h" #include "output.h" - + /** * @addtogroup Streams * @{ @@ -12,50 +12,50 @@ * Null input stream. Does nothing, contains no data. */ class TNullInput: public IZeroCopyInput { -public: +public: TNullInput() noexcept; ~TNullInput() override; - -private: + +private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; size_t DoNext(const void** ptr, size_t len) override; -}; - +}; + /** * Null output stream. Just ignores whatever is written into it. */ class TNullOutput: public IOutputStream { -public: +public: TNullOutput() noexcept; ~TNullOutput() override; - + TNullOutput(TNullOutput&&) noexcept = default; TNullOutput& operator=(TNullOutput&&) noexcept = default; -private: +private: void DoWrite(const void* buf, size_t len) override; -}; - +}; + /** * Null input-output stream. * * @see TNullInput * @see TNullOutput */ -class TNullIO: public TNullInput, public TNullOutput { -public: +class TNullIO: public TNullInput, public TNullOutput { +public: TNullIO() noexcept; ~TNullIO() override; -}; - +}; + namespace NPrivate { TNullIO& StdNullStream() noexcept; } - + /** * Standard null stream. */ #define Cnull (::NPrivate::StdNullStream()) - + /** @} */ diff --git a/util/stream/output.cpp b/util/stream/output.cpp index db81b81b70..14eb235213 100644 --- a/util/stream/output.cpp +++ b/util/stream/output.cpp @@ -1,53 +1,53 @@ -#include "output.h" - -#include <util/string/cast.h> -#include "format.h" -#include <util/memory/tempbuf.h> -#include <util/generic/singleton.h> -#include <util/generic/yexception.h> +#include "output.h" + +#include <util/string/cast.h> +#include "format.h" +#include <util/memory/tempbuf.h> +#include <util/generic/singleton.h> +#include <util/generic/yexception.h> #include <util/charset/utf8.h> -#include <util/charset/wide.h> - +#include <util/charset/wide.h> + #if defined(_android_) - #include <util/system/dynlib.h> - #include <util/system/guard.h> - #include <util/system/mutex.h> - #include <android/log.h> + #include <util/system/dynlib.h> + #include <util/system/guard.h> + #include <util/system/mutex.h> + #include <android/log.h> #endif -#include <cerrno> -#include <string> +#include <cerrno> +#include <string> #include <string_view> -#include <cstdio> - +#include <cstdio> + #if defined(_win_) - #include <io.h> + #include <io.h> #endif constexpr size_t MAX_UTF8_BYTES = 4; // UTF-8-encoded code point takes between 1 and 4 bytes IOutputStream::IOutputStream() noexcept = default; - + IOutputStream::~IOutputStream() = default; - + void IOutputStream::DoFlush() { - /* - * do nothing - */ -} - + /* + * do nothing + */ +} + void IOutputStream::DoFinish() { - Flush(); -} - + Flush(); +} + void IOutputStream::DoWriteV(const TPart* parts, size_t count) { - for (size_t i = 0; i < count; ++i) { - const TPart& part = parts[i]; - - DoWrite(part.buf, part.len); - } -} - + for (size_t i = 0; i < count; ++i) { + const TPart& part = parts[i]; + + DoWrite(part.buf, part.len); + } +} + void IOutputStream::DoWriteC(char ch) { DoWrite(&ch, 1); } @@ -89,16 +89,16 @@ static void WriteString(IOutputStream& o, const wchar32* w, size_t n) { o.Write(data, written); } -template <> +template <> void Out<TString>(IOutputStream& o, const TString& p) { o.Write(p.data(), p.size()); -} - -template <> +} + +template <> void Out<std::string>(IOutputStream& o, const std::string& p) { - o.Write(p.data(), p.length()); -} - + o.Write(p.data(), p.length()); +} + template <> void Out<std::string_view>(IOutputStream& o, const std::string_view& p) { o.Write(p.data(), p.length()); @@ -157,50 +157,50 @@ void Out<TUtf32String>(IOutputStream& o, const TUtf32String& w) { WriteString(o, w.c_str(), w.size()); } -#define DEF_CONV_DEFAULT(type) \ - template <> \ +#define DEF_CONV_DEFAULT(type) \ + template <> \ void Out<type>(IOutputStream & o, type p) { \ - o << ToString(p); \ - } - -#define DEF_CONV_CHR(type) \ - template <> \ + o << ToString(p); \ + } + +#define DEF_CONV_CHR(type) \ + template <> \ void Out<type>(IOutputStream & o, type p) { \ - o.Write((char)p); \ - } - -#define DEF_CONV_NUM(type, len) \ - template <> \ + o.Write((char)p); \ + } + +#define DEF_CONV_NUM(type, len) \ + template <> \ void Out<type>(IOutputStream & o, type p) { \ - char buf[len]; \ - o.Write(buf, ToString(p, buf, sizeof(buf))); \ + char buf[len]; \ + o.Write(buf, ToString(p, buf, sizeof(buf))); \ } \ \ template <> \ void Out<volatile type>(IOutputStream & o, volatile type p) { \ Out<type>(o, p); \ - } - -DEF_CONV_NUM(bool, 64) - -DEF_CONV_CHR(char) -DEF_CONV_CHR(signed char) -DEF_CONV_CHR(unsigned char) - -DEF_CONV_NUM(signed short, 64) -DEF_CONV_NUM(signed int, 64) -DEF_CONV_NUM(signed long int, 64) -DEF_CONV_NUM(signed long long int, 64) - -DEF_CONV_NUM(unsigned short, 64) -DEF_CONV_NUM(unsigned int, 64) -DEF_CONV_NUM(unsigned long int, 64) -DEF_CONV_NUM(unsigned long long int, 64) - -DEF_CONV_NUM(float, 512) -DEF_CONV_NUM(double, 512) -DEF_CONV_NUM(long double, 512) - + } + +DEF_CONV_NUM(bool, 64) + +DEF_CONV_CHR(char) +DEF_CONV_CHR(signed char) +DEF_CONV_CHR(unsigned char) + +DEF_CONV_NUM(signed short, 64) +DEF_CONV_NUM(signed int, 64) +DEF_CONV_NUM(signed long int, 64) +DEF_CONV_NUM(signed long long int, 64) + +DEF_CONV_NUM(unsigned short, 64) +DEF_CONV_NUM(unsigned int, 64) +DEF_CONV_NUM(unsigned long int, 64) +DEF_CONV_NUM(unsigned long long int, 64) + +DEF_CONV_NUM(float, 512) +DEF_CONV_NUM(double, 512) +DEF_CONV_NUM(long double, 512) + #if !defined(_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION) || (_YNDX_LIBCXX_ENABLE_VECTOR_BOOL_COMPRESSION == 1) // TODO: acknowledge std::bitset::reference for both libc++ and libstdc++ template <> @@ -237,14 +237,14 @@ void Out<void*>(IOutputStream& o, void* t) { } using TNullPtr = decltype(nullptr); - -template <> + +template <> void Out<TNullPtr>(IOutputStream& o, TTypeTraits<TNullPtr>::TFuncParam) { o << TStringBuf("nullptr"); -} - +} + #if defined(_android_) -namespace { +namespace { class TAndroidStdIOStreams { public: TAndroidStdIOStreams() @@ -252,32 +252,32 @@ namespace { , LogFuncPtr((TLogFuncPtr)LogLibrary.Sym("__android_log_write")) , Out(LogFuncPtr) , Err(LogFuncPtr) - { - } + { + } public: - using TLogFuncPtr = void (*)(int, const char*, const char*); + using TLogFuncPtr = void (*)(int, const char*, const char*); class TAndroidStdOutput: public IOutputStream { public: inline TAndroidStdOutput(TLogFuncPtr logFuncPtr) noexcept - : Buffer() - , LogFuncPtr(logFuncPtr) - { - } + : Buffer() + , LogFuncPtr(logFuncPtr) + { + } virtual ~TAndroidStdOutput() { - } + } private: virtual void DoWrite(const void* buf, size_t len) override { - with_lock (BufferMutex) { + with_lock (BufferMutex) { Buffer.Write(buf, len); } } virtual void DoFlush() override { - with_lock (BufferMutex) { + with_lock (BufferMutex) { LogFuncPtr(ANDROID_LOG_DEBUG, GetTag(), Buffer.Data()); Buffer.Clear(); } @@ -291,15 +291,15 @@ namespace { TLogFuncPtr LogFuncPtr; }; - class TStdErr: public TAndroidStdOutput { + class TStdErr: public TAndroidStdOutput { public: TStdErr(TLogFuncPtr logFuncPtr) : TAndroidStdOutput(logFuncPtr) - { - } + { + } virtual ~TStdErr() { - } + } private: virtual const char* GetTag() const override { @@ -307,15 +307,15 @@ namespace { } }; - class TStdOut: public TAndroidStdOutput { + class TStdOut: public TAndroidStdOutput { public: TStdOut(TLogFuncPtr logFuncPtr) : TAndroidStdOutput(logFuncPtr) - { - } + { + } virtual ~TStdOut() { - } + } private: virtual const char* GetTag() const override { @@ -340,84 +340,84 @@ namespace { namespace { class TStdOutput: public IOutputStream { - public: + public: inline TStdOutput(FILE* f) noexcept - : F_(f) - { - } - + : F_(f) + { + } + ~TStdOutput() override = default; - - private: + + private: void DoWrite(const void* buf, size_t len) override { - if (len != fwrite(buf, 1, len, F_)) { -#if defined(_win_) + if (len != fwrite(buf, 1, len, F_)) { +#if defined(_win_) // On Windows, if 'F_' is console -- 'fwrite' returns count of written characters. // If, for example, console output codepage is UTF-8, then returned value is // not equal to 'len'. So, we ignore some 'errno' values... if ((errno == 0 || errno == EINVAL || errno == EILSEQ) && _isatty(fileno(F_))) { return; - } -#endif - ythrow TSystemError() << "write failed"; - } - } - + } +#endif + ythrow TSystemError() << "write failed"; + } + } + void DoFlush() override { - if (fflush(F_) != 0) { + if (fflush(F_) != 0) { ythrow TSystemError() << "fflush failed"; } - } - - private: - FILE* F_; - }; - - struct TStdIOStreams { - struct TStdErr: public TStdOutput { - inline TStdErr() - : TStdOutput(stderr) - { - } - + } + + private: + FILE* F_; + }; + + struct TStdIOStreams { + struct TStdErr: public TStdOutput { + inline TStdErr() + : TStdOutput(stderr) + { + } + ~TStdErr() override = default; - }; - - struct TStdOut: public TStdOutput { - inline TStdOut() - : TStdOutput(stdout) - { - } - + }; + + struct TStdOut: public TStdOutput { + inline TStdOut() + : TStdOutput(stdout) + { + } + ~TStdOut() override = default; - }; - - TStdOut Out; - TStdErr Err; - - static inline TStdIOStreams& Instance() { - return *SingletonWithPriority<TStdIOStreams, 4>(); - } - }; -} - + }; + + TStdOut Out; + TStdErr Err; + + static inline TStdIOStreams& Instance() { + return *SingletonWithPriority<TStdIOStreams, 4>(); + } + }; +} + IOutputStream& NPrivate::StdErrStream() noexcept { #if defined(_android_) if (TAndroidStdIOStreams::Enabled) { return TAndroidStdIOStreams::Instance().Err; } #endif - return TStdIOStreams::Instance().Err; -} - + return TStdIOStreams::Instance().Err; +} + IOutputStream& NPrivate::StdOutStream() noexcept { #if defined(_android_) if (TAndroidStdIOStreams::Enabled) { return TAndroidStdIOStreams::Instance().Out; } #endif - return TStdIOStreams::Instance().Out; -} + return TStdIOStreams::Instance().Out; +} void RedirectStdioToAndroidLog(bool redirect) { #if defined(_android_) diff --git a/util/stream/output.h b/util/stream/output.h index 00eef50b95..121208df78 100644 --- a/util/stream/output.h +++ b/util/stream/output.h @@ -1,15 +1,15 @@ #pragma once - + #include "fwd.h" #include "labeled.h" #include <util/generic/noncopyable.h> #include <util/generic/string.h> -#include <util/generic/strbuf.h> -#include <util/generic/typetraits.h> - -#include <type_traits> - +#include <util/generic/strbuf.h> +#include <util/generic/typetraits.h> + +#include <type_traits> + /** * @addtogroup Streams_Base * @{ @@ -19,44 +19,44 @@ * Abstract output stream. */ class IOutputStream: public TNonCopyable { -public: +public: /** * Data block for output. */ - struct TPart { + struct TPart { inline TPart(const void* Buf, size_t Len) noexcept - : buf(Buf) - , len(Len) - { - } - + : buf(Buf) + , len(Len) + { + } + inline TPart(const TStringBuf s) noexcept : buf(s.data()) , len(s.size()) - { - } - + { + } + inline TPart() noexcept - : buf(nullptr) - , len(0) - { - } - + : buf(nullptr) + , len(0) + { + } + inline ~TPart() = default; - + static inline TPart CrLf() noexcept { return TPart("\r\n", 2); - } - - const void* buf; - size_t len; - }; - + } + + const void* buf; + size_t len; + }; + IOutputStream() noexcept; virtual ~IOutputStream(); - + IOutputStream(IOutputStream&&) noexcept { - } + } IOutputStream& operator=(IOutputStream&&) noexcept { return *this; @@ -68,12 +68,12 @@ public: * @param buf Data to write. * @param len Number of bytes to write. */ - inline void Write(const void* buf, size_t len) { - if (len) { - DoWrite(buf, len); - } - } - + inline void Write(const void* buf, size_t len) { + if (len) { + DoWrite(buf, len); + } + } + /** * Writes a string into this stream. * @@ -81,7 +81,7 @@ public: */ inline void Write(const TStringBuf st) { Write(st.data(), st.size()); - } + } /** * Writes several data blocks into this stream. @@ -90,23 +90,23 @@ public: * array. * @param count Number of data blocks to write. */ - inline void Write(const TPart* parts, size_t count) { - if (count > 1) { - DoWriteV(parts, count); - } else if (count) { - DoWrite(parts->buf, parts->len); - } - } - + inline void Write(const TPart* parts, size_t count) { + if (count > 1) { + DoWriteV(parts, count); + } else if (count) { + DoWrite(parts->buf, parts->len); + } + } + /** * Writes a single character into this stream. * * @param ch Character to write. */ - inline void Write(char ch) { + inline void Write(char ch) { DoWriteC(ch); - } - + } + /** * Flushes this stream's buffer, if any. * @@ -115,19 +115,19 @@ public: * stream << "some string" << Flush; * @endcode */ - inline void Flush() { - DoFlush(); - } - + inline void Flush() { + DoFlush(); + } + /** * Flushes and closes this stream. No more data can be written into a stream * once it's closed. */ - inline void Finish() { - DoFinish(); - } - -protected: + inline void Finish() { + DoFinish(); + } + +protected: /** * Writes into this stream. * @@ -135,7 +135,7 @@ protected: * @param len Number of bytes to write. * @throws yexception If IO error occurs. */ - virtual void DoWrite(const void* buf, size_t len) = 0; + virtual void DoWrite(const void* buf, size_t len) = 0; /** * Writes several data blocks into this stream. @@ -145,7 +145,7 @@ protected: * @param count Number of data blocks to write. * @throws yexception If IO error occurs. */ - virtual void DoWriteV(const TPart* parts, size_t count); + virtual void DoWriteV(const TPart* parts, size_t count); /** * Writes a single character into this stream. Can be overridden with a faster implementation. @@ -159,7 +159,7 @@ protected: * * @throws yexception If IO error occurs. */ - virtual void DoFlush(); + virtual void DoFlush(); /** * Flushes and closes this stream. No more data can be written into a stream @@ -167,9 +167,9 @@ protected: * * @throws yexception If IO error occurs. */ - virtual void DoFinish(); -}; - + virtual void DoFinish(); +}; + /** * `operator<<` for `IOutputStream` by default delegates to this function. * @@ -185,60 +185,60 @@ protected: * @param out Output stream to write into. * @param value Value to write. */ -template <class T> +template <class T> void Out(IOutputStream& out, typename TTypeTraits<T>::TFuncParam value); - + #define Y_DECLARE_OUT_SPEC(MODIF, T, stream, value) \ template <> \ MODIF void Out<T>(IOutputStream & stream, TTypeTraits<T>::TFuncParam value) -template <> +template <> inline void Out<const char*>(IOutputStream& o, const char* t) { - if (t) { - o.Write(t); - } else { - o.Write("(null)"); - } -} - -template <> + if (t) { + o.Write(t); + } else { + o.Write("(null)"); + } +} + +template <> void Out<const wchar16*>(IOutputStream& o, const wchar16* w); - + template <> void Out<const wchar32*>(IOutputStream& o, const wchar32* w); static inline IOutputStream& operator<<(IOutputStream& o, TStreamManipulator m) { - m(o); - - return o; -} - + m(o); + + return o; +} + static inline IOutputStream& operator<<(IOutputStream& o, const char* t) { - Out<const char*>(o, t); - - return o; -} - + Out<const char*>(o, t); + + return o; +} + static inline IOutputStream& operator<<(IOutputStream& o, char* t) { - Out<const char*>(o, t); - - return o; -} - -template <class T> -static inline std::enable_if_t<std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, T t) { - Out<T>(o, t); - - return o; -} - -template <class T> -static inline std::enable_if_t<!std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, const T& t) { - Out<T>(o, t); - - return o; -} - + Out<const char*>(o, t); + + return o; +} + +template <class T> +static inline std::enable_if_t<std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, T t) { + Out<T>(o, t); + + return o; +} + +template <class T> +static inline std::enable_if_t<!std::is_scalar<T>::value, IOutputStream&> operator<<(IOutputStream& o, const T& t) { + Out<T>(o, t); + + return o; +} + static inline IOutputStream& operator<<(IOutputStream& o, const wchar16* t) { Out<const wchar16*>(o, t); return o; @@ -277,15 +277,15 @@ namespace NPrivate { /** * Standard log stream. */ -#define Clog Cerr - +#define Clog Cerr + /** * End-of-line output manipulator, basically the same as `std::endl`. - */ + */ static inline void Endl(IOutputStream& o) { (o << '\n').Flush(); -} - +} + /** * Flushing stream manipulator, basically the same as `std::flush`. */ @@ -293,12 +293,12 @@ static inline void Flush(IOutputStream& o) { o.Flush(); } -/* +/* * Also see format.h for additional manipulators. */ -#include "debug.h" - +#include "debug.h" + void RedirectStdioToAndroidLog(bool redirect); /** @} */ diff --git a/util/stream/pipe.cpp b/util/stream/pipe.cpp index 51be1934a7..1487182aa2 100644 --- a/util/stream/pipe.cpp +++ b/util/stream/pipe.cpp @@ -1,38 +1,38 @@ -#include "pipe.h" - -#include <util/generic/yexception.h> - -#include <cstdio> -#include <cerrno> - -class TPipeBase::TImpl { -public: +#include "pipe.h" + +#include <util/generic/yexception.h> + +#include <cstdio> +#include <cerrno> + +class TPipeBase::TImpl { +public: inline TImpl(const TString& command, const char* mode) : Pipe_(nullptr) - { + { #ifndef _freebsd_ - if (strcmp(mode, "r+") == 0) { - ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD"; - } + if (strcmp(mode, "r+") == 0) { + ythrow TSystemError(EINVAL) << "pipe \"r+\" mode is implemented only on FreeBSD"; + } #endif Pipe_ = ::popen(command.data(), mode); if (Pipe_ == nullptr) { ythrow TSystemError() << "failed to open pipe: " << command.Quote(); - } - } - + } + } + inline ~TImpl() { if (Pipe_ != nullptr) { - ::pclose(Pipe_); - } - } - -public: - FILE* Pipe_; -}; - + ::pclose(Pipe_); + } + } + +public: + FILE* Pipe_; +}; + TPipeBase::TPipeBase(const TString& command, const char* mode) - : Impl_(new TImpl(command, mode)) + : Impl_(new TImpl(command, mode)) { } @@ -45,20 +45,20 @@ TPipeInput::TPipeInput(const TString& command) size_t TPipeInput::DoRead(void* buf, size_t len) { if (Impl_->Pipe_ == nullptr) { - return 0; - } - - size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_); - if (bytesRead == 0) { - int exitStatus = ::pclose(Impl_->Pipe_); + return 0; + } + + size_t bytesRead = ::fread(buf, 1, len, Impl_->Pipe_); + if (bytesRead == 0) { + int exitStatus = ::pclose(Impl_->Pipe_); Impl_->Pipe_ = nullptr; - if (exitStatus == -1) { - ythrow TSystemError() << "pclose() failed"; - } else if (exitStatus != 0) { - ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; - } - } - return bytesRead; + if (exitStatus == -1) { + ythrow TSystemError() << "pclose() failed"; + } else if (exitStatus != 0) { + ythrow yexception() << "subprocess exited with non-zero status(" << exitStatus << ")"; + } + } + return bytesRead; } TPipeOutput::TPipeOutput(const TString& command) @@ -68,8 +68,8 @@ TPipeOutput::TPipeOutput(const TString& command) void TPipeOutput::DoWrite(const void* buf, size_t len) { if (Impl_->Pipe_ == nullptr || len != ::fwrite(buf, 1, len, Impl_->Pipe_)) { - ythrow TSystemError() << "fwrite failed"; - } + ythrow TSystemError() << "fwrite failed"; + } } void TPipeOutput::Close() { @@ -88,9 +88,9 @@ TPipedBase::TPipedBase(PIPEHANDLE fd) } TPipedBase::~TPipedBase() { - if (Handle_.IsOpen()) { + if (Handle_.IsOpen()) { Handle_.Close(); - } + } } TPipedInput::TPipedInput(PIPEHANDLE fd) @@ -101,9 +101,9 @@ TPipedInput::TPipedInput(PIPEHANDLE fd) TPipedInput::~TPipedInput() = default; size_t TPipedInput::DoRead(void* buf, size_t len) { - if (!Handle_.IsOpen()) { + if (!Handle_.IsOpen()) { return 0; - } + } return Handle_.Read(buf, len); } diff --git a/util/stream/pipe.h b/util/stream/pipe.h index 18525b9517..3f501eb1b3 100644 --- a/util/stream/pipe.h +++ b/util/stream/pipe.h @@ -1,12 +1,12 @@ #pragma once - + #include "input.h" #include "output.h" - -#include <util/system/pipe.h> + +#include <util/system/pipe.h> #include <util/generic/ptr.h> #include <util/generic/string.h> - + /** * @addtogroup Streams_Pipes * @{ @@ -16,7 +16,7 @@ * Base class for starting a process and communicating with it via pipes. */ class TPipeBase { -protected: +protected: /** * Starts a new process and opens a pipe. * @@ -27,9 +27,9 @@ protected: TPipeBase(const TString& command, const char* mode); virtual ~TPipeBase(); -protected: - class TImpl; - THolder<TImpl> Impl_; +protected: + class TImpl; + THolder<TImpl> Impl_; }; /** @@ -38,8 +38,8 @@ protected: * Note that if the process ends with non-zero exit status, `Read` function will * throw an exception. */ -class TPipeInput: protected TPipeBase, public IInputStream { -public: +class TPipeInput: protected TPipeBase, public IInputStream { +public: /** * Starts a new process and opens a pipe. * @@ -47,7 +47,7 @@ public: */ TPipeInput(const TString& command); -private: +private: size_t DoRead(void* buf, size_t len) override; }; @@ -57,8 +57,8 @@ private: * Note that if the process ends with non-zero exit status, `Close` function will * throw an exception. */ -class TPipeOutput: protected TPipeBase, public IOutputStream { -public: +class TPipeOutput: protected TPipeBase, public IOutputStream { +public: /** * Starts a new process and opens a pipe. * @@ -70,30 +70,30 @@ public: * Waits for the process to terminate and throws an exception if it ended * with a non-zero exit status. */ - void Close(); + void Close(); -private: +private: void DoWrite(const void* buf, size_t len) override; }; class TPipedBase { -protected: - TPipedBase(PIPEHANDLE fd); +protected: + TPipedBase(PIPEHANDLE fd); virtual ~TPipedBase(); -protected: - TPipeHandle Handle_; +protected: + TPipeHandle Handle_; }; /** * Input stream that binds to a standard output stream of an existing process. */ class TPipedInput: public TPipedBase, public IInputStream { -public: - TPipedInput(PIPEHANDLE fd); +public: + TPipedInput(PIPEHANDLE fd); ~TPipedInput() override; -private: +private: size_t DoRead(void* buf, size_t len) override; }; @@ -101,11 +101,11 @@ private: * Output stream that binds to a standard input stream of an existing process. */ class TPipedOutput: public TPipedBase, public IOutputStream { -public: - TPipedOutput(PIPEHANDLE fd); +public: + TPipedOutput(PIPEHANDLE fd); ~TPipedOutput() override; -private: +private: void DoWrite(const void* buf, size_t len) override; }; diff --git a/util/stream/printf.cpp b/util/stream/printf.cpp index f3eeca7afc..a7f7c0db4f 100644 --- a/util/stream/printf.cpp +++ b/util/stream/printf.cpp @@ -1,51 +1,51 @@ #include "output.h" -#include "printf.h" - -#include <util/generic/scope.h> -#include <util/memory/tempbuf.h> -#include <util/generic/yexception.h> - +#include "printf.h" + +#include <util/generic/scope.h> +#include <util/memory/tempbuf.h> +#include <util/generic/yexception.h> + size_t Printf(IOutputStream& out, const char* fmt, ...) { - va_list lst; - va_start(lst, fmt); - - Y_DEFER { - va_end(lst); - }; - - return Printf(out, fmt, lst); -} - + va_list lst; + va_start(lst, fmt); + + Y_DEFER { + va_end(lst); + }; + + return Printf(out, fmt, lst); +} + static inline size_t TryPrintf(void* ptr, size_t len, IOutputStream& out, const char* fmt, va_list params) { - va_list lst; - va_copy(lst, params); - const int ret = vsnprintf((char*)ptr, len, fmt, lst); - va_end(lst); - - if (ret < 0) { - return len; - } - - if ((size_t)ret < len) { - out.Write(ptr, (size_t)ret); - } - - return (size_t)ret; -} - + va_list lst; + va_copy(lst, params); + const int ret = vsnprintf((char*)ptr, len, fmt, lst); + va_end(lst); + + if (ret < 0) { + return len; + } + + if ((size_t)ret < len) { + out.Write(ptr, (size_t)ret); + } + + return (size_t)ret; +} + size_t Printf(IOutputStream& out, const char* fmt, va_list params) { - size_t guess = 0; - - while (true) { - TTempBuf tmp(guess); - const size_t ret = TryPrintf(tmp.Data(), tmp.Size(), out, fmt, params); - - if (ret < tmp.Size()) { - return ret; - } - - guess = Max(tmp.Size() * 2, ret + 1); - } - - return 0; -} + size_t guess = 0; + + while (true) { + TTempBuf tmp(guess); + const size_t ret = TryPrintf(tmp.Data(), tmp.Size(), out, fmt, params); + + if (ret < tmp.Size()) { + return ret; + } + + guess = Max(tmp.Size() * 2, ret + 1); + } + + return 0; +} diff --git a/util/stream/printf.h b/util/stream/printf.h index 1c7ddc0664..7a24a37530 100644 --- a/util/stream/printf.h +++ b/util/stream/printf.h @@ -1,9 +1,9 @@ #pragma once - -#include <util/system/compat.h> - + +#include <util/system/compat.h> + class IOutputStream; - + /** * Stream-based `printf` function. Prints formatted data into the provided stream. * Works the same way as a standard C `printf`. diff --git a/util/stream/printf_ut.cpp b/util/stream/printf_ut.cpp index 0eab167062..fa0e7b5ff0 100644 --- a/util/stream/printf_ut.cpp +++ b/util/stream/printf_ut.cpp @@ -1,33 +1,33 @@ #include "null.h" -#include "printf.h" +#include "printf.h" #include "str.h" - + #include <util/generic/string.h> #include <library/cpp/testing/unittest/registar.h> - + Y_UNIT_TEST_SUITE(TStreamPrintfTest) { Y_UNIT_TEST(TestPrintf) { - TStringStream ss; - - UNIT_ASSERT_EQUAL(Printf(ss, "qw %s %d", "er", 1), 7); - UNIT_ASSERT_EQUAL(ss.Str(), "qw er 1"); - } - + TStringStream ss; + + UNIT_ASSERT_EQUAL(Printf(ss, "qw %s %d", "er", 1), 7); + UNIT_ASSERT_EQUAL(ss.Str(), "qw er 1"); + } + #ifdef __GNUC__ - #pragma GCC diagnostic ignored "-Wformat-zero-length" + #pragma GCC diagnostic ignored "-Wformat-zero-length" #endif // __GNUC__ Y_UNIT_TEST(TestZeroString) { UNIT_ASSERT_EQUAL(Printf(Cnull, ""), 0); - } - + } + Y_UNIT_TEST(TestLargePrintf) { TString s = NUnitTest::RandomString(1000000); - TStringStream ss; - + TStringStream ss; + Printf(ss, "%s", s.data()); - - UNIT_ASSERT_EQUAL(ss.Str(), s); - } -} + + UNIT_ASSERT_EQUAL(ss.Str(), s); + } +} diff --git a/util/stream/str.cpp b/util/stream/str.cpp index 13f0e8ef28..3be3435db5 100644 --- a/util/stream/str.cpp +++ b/util/stream/str.cpp @@ -1,9 +1,9 @@ -#include "str.h" - +#include "str.h" + static constexpr size_t MIN_BUFFER_GROW_SIZE = 16; TStringInput::~TStringInput() = default; - + size_t TStringInput::DoNext(const void** ptr, size_t len) { len = Min(len, S_->size() - Pos_); *ptr = S_->data() + Pos_; @@ -17,28 +17,28 @@ void TStringInput::DoUndo(size_t len) { } TStringOutput::~TStringOutput() = default; - + size_t TStringOutput::DoNext(void** ptr) { - if (S_->size() == S_->capacity()) { - S_->reserve(FastClp2(S_->capacity() + MIN_BUFFER_GROW_SIZE)); + if (S_->size() == S_->capacity()) { + S_->reserve(FastClp2(S_->capacity() + MIN_BUFFER_GROW_SIZE)); } - size_t previousSize = S_->size(); - ResizeUninitialized(*S_, S_->capacity()); - *ptr = S_->begin() + previousSize; - return S_->size() - previousSize; + size_t previousSize = S_->size(); + ResizeUninitialized(*S_, S_->capacity()); + *ptr = S_->begin() + previousSize; + return S_->size() - previousSize; } void TStringOutput::DoUndo(size_t len) { - Y_VERIFY(len <= S_->size(), "trying to undo more bytes than actually written"); - S_->resize(S_->size() - len); -} - -void TStringOutput::DoWrite(const void* buf, size_t len) { - S_->append((const char*)buf, len); + Y_VERIFY(len <= S_->size(), "trying to undo more bytes than actually written"); + S_->resize(S_->size() - len); } +void TStringOutput::DoWrite(const void* buf, size_t len) { + S_->append((const char*)buf, len); +} + void TStringOutput::DoWriteC(char c) { - S_->push_back(c); + S_->push_back(c); } TStringStream::~TStringStream() = default; diff --git a/util/stream/str.h b/util/stream/str.h index 028bd572c0..fc45baec27 100644 --- a/util/stream/str.h +++ b/util/stream/str.h @@ -1,12 +1,12 @@ #pragma once - + #include "zerocopy.h" #include "zerocopy_output.h" - + #include <util/generic/string.h> #include <util/generic/noncopyable.h> #include <util/generic/store_policy.h> - + /** * @addtogroup Streams_Strings * @{ @@ -16,7 +16,7 @@ * Input stream for reading data from a string. */ class TStringInput: public IZeroCopyInputFastReadTo { -public: +public: /** * Constructs a string input stream that reads character data from the * provided string. @@ -31,38 +31,38 @@ public: */ inline TStringInput(const TString& s) noexcept : S_(&s) - , Pos_(0) - { - } - + , Pos_(0) + { + } + TStringInput(const TString&&) = delete; ~TStringInput() override; - + TStringInput(TStringInput&&) noexcept = default; TStringInput& operator=(TStringInput&&) noexcept = default; - inline void Swap(TStringInput& s) noexcept { - DoSwap(S_, s.S_); - DoSwap(Pos_, s.Pos_); - } - + inline void Swap(TStringInput& s) noexcept { + DoSwap(S_, s.S_); + DoSwap(Pos_, s.Pos_); + } + protected: size_t DoNext(const void** ptr, size_t len) override; void DoUndo(size_t len) override; - -private: + +private: const TString* S_; - size_t Pos_; + size_t Pos_; friend class TStringStream; -}; - +}; + /** * Stream for writing data into a string. */ class TStringOutput: public IZeroCopyOutput { -public: +public: /** * Constructs a string output stream that appends character data to the * provided string. @@ -74,74 +74,74 @@ public: * @param s String to append to. */ inline TStringOutput(TString& s) noexcept - : S_(&s) - { - } - - TStringOutput(TStringOutput&& s) noexcept = default; + : S_(&s) + { + } + + TStringOutput(TStringOutput&& s) noexcept = default; ~TStringOutput() override; - + /** * @param size Number of additional characters to * reserve in output string. */ inline void Reserve(size_t size) { - S_->reserve(S_->size() + size); - } - - inline void Swap(TStringOutput& s) noexcept { - DoSwap(S_, s.S_); + S_->reserve(S_->size() + size); } + inline void Swap(TStringOutput& s) noexcept { + DoSwap(S_, s.S_); + } + protected: size_t DoNext(void** ptr) override; void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; - -private: - TString* S_; -}; - + +private: + TString* S_; +}; + /** * String input/output stream, similar to `std::stringstream`. */ class TStringStream: private TEmbedPolicy<TString>, public TStringInput, public TStringOutput { using TEmbeddedString = TEmbedPolicy<TString>; -public: - inline TStringStream() +public: + inline TStringStream() : TEmbeddedString() , TStringInput(*TEmbeddedString::Ptr()) , TStringOutput(*TEmbeddedString::Ptr()) - { - } - + { + } + inline TStringStream(const TString& string) : TEmbeddedString(string) , TStringInput(*TEmbeddedString::Ptr()) , TStringOutput(*TEmbeddedString::Ptr()) - { - } + { + } - inline TStringStream(const TStringStream& other) + inline TStringStream(const TStringStream& other) : TEmbeddedString(other.Str()) , TStringInput(*TEmbeddedString::Ptr()) , TStringOutput(*TEmbeddedString::Ptr()) - { - } + { + } - inline TStringStream& operator=(const TStringStream& other) { - // All references remain alive, we need to change position only + inline TStringStream& operator=(const TStringStream& other) { + // All references remain alive, we need to change position only Str() = other.Str(); - Pos_ = other.Pos_; - - return *this; - } + Pos_ = other.Pos_; + + return *this; + } ~TStringStream() override; - + /** * @returns Whether @c this contains any data */ @@ -151,17 +151,17 @@ public: /** * @returns String that this stream is writing into. - */ + */ inline TString& Str() noexcept { return *Ptr(); - } - + } + /** * @returns String that this stream is writing into. */ inline const TString& Str() const noexcept { return *Ptr(); - } + } /** * @returns Pointer to the character data contained @@ -186,7 +186,7 @@ public: * @returns Whether the string that this stream * operates on is empty. */ - Y_PURE_FUNCTION inline bool Empty() const noexcept { + Y_PURE_FUNCTION inline bool Empty() const noexcept { return Str().empty(); } @@ -203,13 +203,13 @@ public: // TODO: compatibility with existing code, remove - Y_PURE_FUNCTION bool empty() const { + Y_PURE_FUNCTION bool empty() const { return Empty(); } void clear() { Clear(); } -}; - +}; + /** @} */ diff --git a/util/stream/str_ut.cpp b/util/stream/str_ut.cpp index fc6b46c31a..76f1879c2c 100644 --- a/util/stream/str_ut.cpp +++ b/util/stream/str_ut.cpp @@ -52,16 +52,16 @@ Y_UNIT_TEST_SUITE(TStringInputOutputTest) { TString string0 = "All animals are equal, but some animals are more equal than others."; TString string1; - for (size_t i = 1; i <= string0.size(); i++) { + for (size_t i = 1; i <= string0.size(); i++) { string1 += string0.substr(0, i); - } + } TStringInput input0(string1); size_t left = 5; - while (left > 0) { + while (left > 0) { left -= input0.Skip(left); - } + } TString string2 = input0.ReadAll(); @@ -116,17 +116,17 @@ Y_UNIT_TEST_SUITE(TStringInputOutputTest) { Y_UNIT_TEST(Write) { TString str; TStringOutput output(str); - output << "1" - << "22" - << "333" - << "4444" - << "55555"; - - UNIT_ASSERT_STRINGS_EQUAL(str, "1" - "22" - "333" - "4444" - "55555"); + output << "1" + << "22" + << "333" + << "4444" + << "55555"; + + UNIT_ASSERT_STRINGS_EQUAL(str, "1" + "22" + "333" + "4444" + "55555"); } Y_UNIT_TEST(WriteChars) { diff --git a/util/stream/tee.cpp b/util/stream/tee.cpp index 99873b95ba..b3ee719fcd 100644 --- a/util/stream/tee.cpp +++ b/util/stream/tee.cpp @@ -1,24 +1,24 @@ -#include "tee.h" - +#include "tee.h" + TTeeOutput::TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept - : L_(l) - , R_(r) -{ + : L_(l) + , R_(r) +{ } TTeeOutput::~TTeeOutput() = default; void TTeeOutput::DoWrite(const void* buf, size_t len) { - L_->Write(buf, len); - R_->Write(buf, len); + L_->Write(buf, len); + R_->Write(buf, len); } void TTeeOutput::DoFlush() { - L_->Flush(); - R_->Flush(); + L_->Flush(); + R_->Flush(); } void TTeeOutput::DoFinish() { - L_->Finish(); - R_->Finish(); + L_->Finish(); + R_->Finish(); } diff --git a/util/stream/tee.h b/util/stream/tee.h index c69e232fb9..7afaa3e260 100644 --- a/util/stream/tee.h +++ b/util/stream/tee.h @@ -1,7 +1,7 @@ #pragma once - -#include "output.h" - + +#include "output.h" + /** * @addtogroup Streams_Multi * @{ @@ -11,16 +11,16 @@ * A proxy output stream that writes into two slave streams simultaneously. */ class TTeeOutput: public IOutputStream { -public: +public: TTeeOutput(IOutputStream* l, IOutputStream* r) noexcept; ~TTeeOutput() override; -private: +private: void DoWrite(const void* buf, size_t len) override; void DoFlush() override; void DoFinish() override; -private: +private: IOutputStream* L_; IOutputStream* R_; }; diff --git a/util/stream/tempbuf.cpp b/util/stream/tempbuf.cpp index 801a1fabb0..50fca60be5 100644 --- a/util/stream/tempbuf.cpp +++ b/util/stream/tempbuf.cpp @@ -1,22 +1,22 @@ -#include "tempbuf.h" - -namespace { +#include "tempbuf.h" + +namespace { static inline size_t Next(size_t size) noexcept { - return size * 2; - } -} - + return size * 2; + } +} + void TTempBufOutput::DoWrite(const void* data, size_t len) { if (Y_LIKELY(len <= Left())) { - Append(data, len); - } else { - const size_t filled = Filled(); - - TTempBuf buf(Next(filled + len)); - - buf.Append(Data(), filled); - buf.Append(data, len); - - static_cast<TTempBuf&>(*this) = buf; - } -} + Append(data, len); + } else { + const size_t filled = Filled(); + + TTempBuf buf(Next(filled + len)); + + buf.Append(Data(), filled); + buf.Append(data, len); + + static_cast<TTempBuf&>(*this) = buf; + } +} diff --git a/util/stream/tempbuf.h b/util/stream/tempbuf.h index a6dc001025..cbf184ae7e 100644 --- a/util/stream/tempbuf.h +++ b/util/stream/tempbuf.h @@ -1,17 +1,17 @@ #pragma once - -#include "output.h" - -#include <util/memory/tempbuf.h> - + +#include "output.h" + +#include <util/memory/tempbuf.h> + class TTempBufOutput: public IOutputStream, public TTempBuf { public: inline TTempBufOutput() = default; explicit TTempBufOutput(size_t size) : TTempBuf(size) - { - } + { + } TTempBufOutput(TTempBufOutput&&) noexcept = default; TTempBufOutput& operator=(TTempBufOutput&&) noexcept = default; diff --git a/util/stream/tokenizer.cpp b/util/stream/tokenizer.cpp index 44e719530a..329e9efb75 100644 --- a/util/stream/tokenizer.cpp +++ b/util/stream/tokenizer.cpp @@ -1 +1 @@ -#include "tokenizer.h" +#include "tokenizer.h" diff --git a/util/stream/tokenizer.h b/util/stream/tokenizer.h index b2398efdd1..a0ec9c3cb2 100644 --- a/util/stream/tokenizer.h +++ b/util/stream/tokenizer.h @@ -1,17 +1,17 @@ #pragma once - -#include "input.h" - + +#include "input.h" + #include <util/generic/buffer.h> #include <util/generic/mem_copy.h> #include <util/generic/strbuf.h> #include <util/system/compiler.h> -#include <util/system/yassert.h> - +#include <util/system/yassert.h> + /** * @addtogroup Streams * @{ - */ + */ /** * Simple stream tokenizer. Splits the stream into tokens that are available @@ -21,147 +21,147 @@ * @see TEol */ template <typename TEndOfToken> -class TStreamTokenizer { +class TStreamTokenizer { public: - class TIterator { - public: + class TIterator { + public: inline TIterator(TStreamTokenizer* const parent) - : Parent_(parent) - , AtEnd_(!Parent_->Next(Data_, Len_)) - { - } - + : Parent_(parent) + , AtEnd_(!Parent_->Next(Data_, Len_)) + { + } + inline TIterator() noexcept - : Parent_(nullptr) - , Data_(nullptr) - , Len_(0) - , AtEnd_(true) - { - } - + : Parent_(nullptr) + , Data_(nullptr) + , Len_(0) + , AtEnd_(true) + { + } + inline ~TIterator() = default; - - inline void operator++() { - Next(); - } - + + inline void operator++() { + Next(); + } + inline bool operator==(const TIterator& l) const noexcept { - return AtEnd_ == l.AtEnd_; - } - + return AtEnd_ == l.AtEnd_; + } + inline bool operator!=(const TIterator& l) const noexcept { - return !(*this == l); - } - + return !(*this == l); + } + /** * @return Return null-terminated character array with current token. * The pointer may be invalid after iterator increment. */ inline const char* Data() const noexcept { Y_ASSERT(!AtEnd_); - - return Data_; - } - + + return Data_; + } + /** * @return Length of current token. */ inline size_t Length() const noexcept { Y_ASSERT(!AtEnd_); - - return Len_; - } - + + return Len_; + } + inline TIterator* operator->() noexcept { - return this; - } - + return this; + } + inline TStringBuf operator*() noexcept { return TStringBuf{Data_, Len_}; - } - - private: - inline void Next() { + } + + private: + inline void Next() { Y_ASSERT(Parent_); - - AtEnd_ = !Parent_->Next(Data_, Len_); - } - - private: + + AtEnd_ = !Parent_->Next(Data_, Len_); + } + + private: TStreamTokenizer* const Parent_; - char* Data_; - size_t Len_; - bool AtEnd_; - }; - + char* Data_; + size_t Len_; + bool AtEnd_; + }; + inline TStreamTokenizer(IInputStream* const input, const TEndOfToken& eot = TEndOfToken(), const size_t initial = 1024) - : Input_(input) + : Input_(input) , Buf_(initial) - , Cur_(BufBegin()) - , End_(BufBegin()) - , Eot_(eot) - { - CheckBuf(); - } - - inline bool Next(char*& buf, size_t& len) { - char* it = Cur_; - - while (true) { - do { - while (it != End_) { - if (Eot_(*it)) { + , Cur_(BufBegin()) + , End_(BufBegin()) + , Eot_(eot) + { + CheckBuf(); + } + + inline bool Next(char*& buf, size_t& len) { + char* it = Cur_; + + while (true) { + do { + while (it != End_) { + if (Eot_(*it)) { *it = '\0'; - - buf = Cur_; - len = it - Cur_; - Cur_ = it + 1; - - return true; - } else { - ++it; - } - } - - if (Fill() == 0 && End_ != BufEnd()) { + + buf = Cur_; + len = it - Cur_; + Cur_ = it + 1; + + return true; + } else { + ++it; + } + } + + if (Fill() == 0 && End_ != BufEnd()) { *it = '\0'; - - buf = Cur_; - len = it - Cur_; - Cur_ = End_; - - return len; - } - } while (it != BufEnd()); - + + buf = Cur_; + len = it - Cur_; + Cur_ = End_; + + return len; + } + } while (it != BufEnd()); + Y_ASSERT(it == BufEnd()); Y_ASSERT(End_ == BufEnd()); - - const size_t blen = End_ - Cur_; - if (Cur_ == BufBegin()) { + + const size_t blen = End_ - Cur_; + if (Cur_ == BufBegin()) { Y_ASSERT(blen == Buf_.Capacity()); - - /* + + /* * do reallocate */ - + Buf_.Reserve(Buf_.Capacity() * 4); - CheckBuf(); - } else { - /* + CheckBuf(); + } else { + /* * do move */ - + MemMove(BufBegin(), Cur_, blen); - } - - Cur_ = BufBegin(); - End_ = Cur_ + blen; - it = End_; - } - } - + } + + Cur_ = BufBegin(); + End_ = Cur_ + blen; + it = End_; + } + } + inline TIterator begin() { return TIterator{this}; } @@ -170,45 +170,45 @@ public: return {}; } -private: - inline size_t Fill() { - const size_t avail = BufEnd() - End_; +private: + inline size_t Fill() { + const size_t avail = BufEnd() - End_; const size_t bytesRead = Input_->Read(End_, avail); - + End_ += bytesRead; - + return bytesRead; - } - + } + inline char* BufBegin() noexcept { return Buf_.Data(); - } - + } + inline char* BufEnd() noexcept { return Buf_.Data() + Buf_.Capacity(); - } - - inline void CheckBuf() const { + } + + inline void CheckBuf() const { if (!Buf_.Data()) { throw std::bad_alloc(); - } - } - -private: + } + } + +private: IInputStream* const Input_; TBuffer Buf_; - char* Cur_; - char* End_; - TEndOfToken Eot_; -}; - + char* Cur_; + char* End_; + TEndOfToken Eot_; +}; + /** * Predicate for `TStreamTokenizer` that uses '\\n' as a delimiter. - */ -struct TEol { + */ +struct TEol { inline bool operator()(char ch) const noexcept { - return ch == '\n'; - } -}; - + return ch == '\n'; + } +}; + /** @} */ diff --git a/util/stream/trace.cpp b/util/stream/trace.cpp index f37a0b76db..53036ec198 100644 --- a/util/stream/trace.cpp +++ b/util/stream/trace.cpp @@ -1 +1 @@ -#include "trace.h" +#include "trace.h" diff --git a/util/stream/trace.h b/util/stream/trace.h index e74b6ecf3e..81228a509f 100644 --- a/util/stream/trace.h +++ b/util/stream/trace.h @@ -5,23 +5,23 @@ /** * Debug level, as set via `DBGOUT` environment variable. */ -enum ETraceLevel: ui8 { - TRACE_ERR = 1, - TRACE_WARN = 2, - TRACE_NOTICE = 3, - TRACE_INFO = 4, - TRACE_DEBUG = 5, - TRACE_DETAIL = 6, - TRACE_VERBOSE = 7 +enum ETraceLevel: ui8 { + TRACE_ERR = 1, + TRACE_WARN = 2, + TRACE_NOTICE = 3, + TRACE_INFO = 4, + TRACE_DEBUG = 5, + TRACE_DETAIL = 6, + TRACE_VERBOSE = 7 }; #if !defined(NDEBUG) && !defined(Y_ENABLE_TRACE) - #define Y_ENABLE_TRACE + #define Y_ENABLE_TRACE #endif #ifdef Y_ENABLE_TRACE - /** + /** * Writes the given data into standard debug stream if current debug level set * via `DBGOUT` environment variable permits it. * @@ -40,21 +40,21 @@ enum ETraceLevel: ui8 { * operator. * @see ETraceLevel */ - #define Y_DBGTRACE(elevel, args) Y_DBGTRACE0(int(TRACE_##elevel), args) - #define Y_DBGTRACE0(level, args) \ - do \ - if ((level) <= StdDbgLevel()) { \ - StdDbgStream() << args << Endl; \ - } \ - while (false) + #define Y_DBGTRACE(elevel, args) Y_DBGTRACE0(int(TRACE_##elevel), args) + #define Y_DBGTRACE0(level, args) \ + do \ + if ((level) <= StdDbgLevel()) { \ + StdDbgStream() << args << Endl; \ + } \ + while (false) #else - #define Y_DBGTRACE(elevel, args) \ - do { \ - } while (false) - #define Y_DBGTRACE0(level, args) \ - do { \ - } while (false) + #define Y_DBGTRACE(elevel, args) \ + do { \ + } while (false) + #define Y_DBGTRACE0(level, args) \ + do { \ + } while (false) #endif diff --git a/util/stream/ut/ya.make b/util/stream/ut/ya.make index f0176dd7b4..4389dc4bc0 100644 --- a/util/stream/ut/ya.make +++ b/util/stream/ut/ya.make @@ -1,28 +1,28 @@ -UNITTEST_FOR(util) +UNITTEST_FOR(util) OWNER(g:util) SUBSCRIBER(g:util-subscribers) SRCS( - stream/aligned_ut.cpp - stream/buffer_ut.cpp - stream/buffered_ut.cpp - stream/direct_io_ut.cpp - stream/file_ut.cpp - stream/format_ut.cpp - stream/hex_ut.cpp - stream/input_ut.cpp - stream/ios_ut.cpp - stream/labeled_ut.cpp - stream/length_ut.cpp - stream/mem_ut.cpp + stream/aligned_ut.cpp + stream/buffer_ut.cpp + stream/buffered_ut.cpp + stream/direct_io_ut.cpp + stream/file_ut.cpp + stream/format_ut.cpp + stream/hex_ut.cpp + stream/input_ut.cpp + stream/ios_ut.cpp + stream/labeled_ut.cpp + stream/length_ut.cpp + stream/mem_ut.cpp stream/multi_ut.cpp - stream/printf_ut.cpp - stream/str_ut.cpp - stream/tokenizer_ut.cpp - stream/walk_ut.cpp + stream/printf_ut.cpp + stream/str_ut.cpp + stream/tokenizer_ut.cpp + stream/walk_ut.cpp stream/zerocopy_output_ut.cpp - stream/zlib_ut.cpp + stream/zlib_ut.cpp ) INCLUDE(${ARCADIA_ROOT}/util/tests/ya_util_tests.inc) diff --git a/util/stream/walk.cpp b/util/stream/walk.cpp index 57dc9ab036..84602105c2 100644 --- a/util/stream/walk.cpp +++ b/util/stream/walk.cpp @@ -1,5 +1,5 @@ -#include "walk.h" - +#include "walk.h" + #include <util/generic/string.h> void IWalkInput::DoUndo(size_t len) { @@ -8,15 +8,15 @@ void IWalkInput::DoUndo(size_t len) { } size_t IWalkInput::DoNext(const void** ptr, size_t len) { - if (!Len_) { - Len_ = DoUnboundedNext(&Buf_); - } - - len = Min(Len_, len); - *ptr = Buf_; - - Buf_ = static_cast<const char*>(Buf_) + len; - Len_ -= len; - - return len; -} + if (!Len_) { + Len_ = DoUnboundedNext(&Buf_); + } + + len = Min(Len_, len); + *ptr = Buf_; + + Buf_ = static_cast<const char*>(Buf_) + len; + Len_ -= len; + + return len; +} diff --git a/util/stream/walk.h b/util/stream/walk.h index 7e62cb44dc..787d243b78 100644 --- a/util/stream/walk.h +++ b/util/stream/walk.h @@ -1,20 +1,20 @@ #pragma once - -#include "zerocopy.h" - + +#include "zerocopy.h" + /** * Zero-copy stream that simplifies implementation of derived classes. * * Derived classes must implement `DoUnboundedNext` method. */ class IWalkInput: public IZeroCopyInputFastReadTo { -public: +public: IWalkInput() : Buf_(nullptr) , Len_(0) - { - } - + { + } + protected: void DoUndo(size_t len) override; size_t DoNext(const void** ptr, size_t len) override; @@ -29,7 +29,7 @@ protected: */ virtual size_t DoUnboundedNext(const void** ptr) = 0; -private: +private: const void* Buf_; size_t Len_; -}; +}; diff --git a/util/stream/walk_ut.cpp b/util/stream/walk_ut.cpp index e0a783799f..739bba33cb 100644 --- a/util/stream/walk_ut.cpp +++ b/util/stream/walk_ut.cpp @@ -5,10 +5,10 @@ class TStringListInput: public IWalkInput { public: TStringListInput(const TVector<TString>& data) - : Data_(data) - , Index_(0) - { - } + : Data_(data) + , Index_(0) + { + } protected: size_t DoUnboundedNext(const void** ptr) override { diff --git a/util/stream/zerocopy.cpp b/util/stream/zerocopy.cpp index dc2982ad55..dffd211c1a 100644 --- a/util/stream/zerocopy.cpp +++ b/util/stream/zerocopy.cpp @@ -1,22 +1,22 @@ #include "zerocopy.h" -#include "output.h" - +#include "output.h" + IZeroCopyInput::~IZeroCopyInput() = default; - + size_t IZeroCopyInput::DoRead(void* buf, size_t len) { const void* ptr; size_t result = DoNext(&ptr, len); - - if (result) { - memcpy(buf, ptr, result); - } - + + if (result) { + memcpy(buf, ptr, result); + } + return result; } ui64 IZeroCopyInput::DoReadAll(IOutputStream& out) { ui64 result = 0; - const void* ptr; + const void* ptr; while (size_t len = Next(&ptr)) { out.Write(ptr, len); @@ -28,7 +28,7 @@ ui64 IZeroCopyInput::DoReadAll(IOutputStream& out) { size_t IZeroCopyInput::DoSkip(size_t len) { const void* ptr; - + return DoNext(&ptr, len); } diff --git a/util/stream/zerocopy.h b/util/stream/zerocopy.h index 3315aa3a51..2c14422362 100644 --- a/util/stream/zerocopy.h +++ b/util/stream/zerocopy.h @@ -7,7 +7,7 @@ #include "input.h" class IOutputStream; - + /** * @addtogroup Streams * @{ @@ -19,7 +19,7 @@ class IOutputStream; * Derived classes must implement `DoNext` method. */ class IZeroCopyInput: public IInputStream { -public: +public: IZeroCopyInput() noexcept = default; ~IZeroCopyInput() override; @@ -37,18 +37,18 @@ public: * @returns Size of the returned data chunk, in bytes. * Return value of zero signals end of stream. */ - template <class T> - inline size_t Next(T** ptr, size_t len) { + template <class T> + inline size_t Next(T** ptr, size_t len) { Y_ASSERT(ptr); - + return DoNext((const void**)ptr, len); - } - - template <class T> - inline size_t Next(T** ptr) { - return Next(ptr, Max<size_t>()); - } - + } + + template <class T> + inline size_t Next(T** ptr) { + return Next(ptr, Max<size_t>()); + } + protected: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; @@ -68,7 +68,7 @@ public: IZeroCopyInputFastReadTo(IZeroCopyInputFastReadTo&&) noexcept = default; IZeroCopyInputFastReadTo& operator=(IZeroCopyInputFastReadTo&&) noexcept = default; - + protected: size_t DoReadTo(TString& st, char ch) override; diff --git a/util/stream/zlib.cpp b/util/stream/zlib.cpp index 60f4e9439f..016b5c7bea 100644 --- a/util/stream/zlib.cpp +++ b/util/stream/zlib.cpp @@ -1,155 +1,155 @@ -#include "zlib.h" - -#include <util/memory/addstorage.h> +#include "zlib.h" + +#include <util/memory/addstorage.h> #include <util/generic/scope.h> -#include <util/generic/utility.h> - -#include <contrib/libs/zlib/zlib.h> - -#include <cstdio> -#include <cstring> - -namespace { - static const int opts[] = { - //Auto - 15 + 32, - //ZLib - 15 + 0, - //GZip - 15 + 16, - //Raw - -15}; - - class TZLibCommon { - public: +#include <util/generic/utility.h> + +#include <contrib/libs/zlib/zlib.h> + +#include <cstdio> +#include <cstring> + +namespace { + static const int opts[] = { + //Auto + 15 + 32, + //ZLib + 15 + 0, + //GZip + 15 + 16, + //Raw + -15}; + + class TZLibCommon { + public: inline TZLibCommon() noexcept { - memset(Z(), 0, sizeof(*Z())); - } - + memset(Z(), 0, sizeof(*Z())); + } + inline ~TZLibCommon() = default; - + inline const char* GetErrMsg() const noexcept { return Z()->msg != nullptr ? Z()->msg : "unknown error"; - } - + } + inline z_stream* Z() const noexcept { - return (z_stream*)(&Z_); - } - - private: - z_stream Z_; - }; - + return (z_stream*)(&Z_); + } + + private: + z_stream Z_; + }; + static inline ui32 MaxPortion(size_t s) noexcept { - return (ui32)Min<size_t>(Max<ui32>(), s); - } - - struct TChunkedZeroCopyInput { + return (ui32)Min<size_t>(Max<ui32>(), s); + } + + struct TChunkedZeroCopyInput { inline TChunkedZeroCopyInput(IZeroCopyInput* in) - : In(in) + : In(in) , Buf(nullptr) - , Len(0) - { - } - - template <class P, class T> - inline bool Next(P** buf, T* len) { - if (!Len) { + , Len(0) + { + } + + template <class P, class T> + inline bool Next(P** buf, T* len) { + if (!Len) { Len = In->Next(&Buf); if (!Len) { - return false; - } - } - - const T toread = (T)Min((size_t)Max<T>(), Len); - - *len = toread; - *buf = (P*)Buf; - - Buf += toread; - Len -= toread; - - return true; - } - + return false; + } + } + + const T toread = (T)Min((size_t)Max<T>(), Len); + + *len = toread; + *buf = (P*)Buf; + + Buf += toread; + Len -= toread; + + return true; + } + IZeroCopyInput* In; - const char* Buf; - size_t Len; - }; -} - -class TZLibDecompress::TImpl: private TZLibCommon, public TChunkedZeroCopyInput { -public: + const char* Buf; + size_t Len; + }; +} + +class TZLibDecompress::TImpl: private TZLibCommon, public TChunkedZeroCopyInput { +public: inline TImpl(IZeroCopyInput* in, ZLib::StreamType type, TStringBuf dict) - : TChunkedZeroCopyInput(in) + : TChunkedZeroCopyInput(in) , Dict(dict) - { - if (inflateInit2(Z(), opts[type]) != Z_OK) { - ythrow TZLibDecompressorError() << "can not init inflate engine"; - } + { + if (inflateInit2(Z(), opts[type]) != Z_OK) { + ythrow TZLibDecompressorError() << "can not init inflate engine"; + } if (dict.size() && type == ZLib::Raw) { SetDict(); } - } - - virtual ~TImpl() { - inflateEnd(Z()); - } - + } + + virtual ~TImpl() { + inflateEnd(Z()); + } + void SetAllowMultipleStreams(bool allowMultipleStreams) { AllowMultipleStreams_ = allowMultipleStreams; } - inline size_t Read(void* buf, size_t size) { - Z()->next_out = (unsigned char*)buf; - Z()->avail_out = size; - - while (true) { - if (Z()->avail_in == 0) { - if (!FillInputBuffer()) { - return 0; - } - } - - switch (inflate(Z(), Z_SYNC_FLUSH)) { + inline size_t Read(void* buf, size_t size) { + Z()->next_out = (unsigned char*)buf; + Z()->avail_out = size; + + while (true) { + if (Z()->avail_in == 0) { + if (!FillInputBuffer()) { + return 0; + } + } + + switch (inflate(Z(), Z_SYNC_FLUSH)) { case Z_NEED_DICT: { SetDict(); continue; } - case Z_STREAM_END: { + case Z_STREAM_END: { if (AllowMultipleStreams_) { if (inflateReset(Z()) != Z_OK) { ythrow TZLibDecompressorError() << "inflate reset error(" << GetErrMsg() << ")"; } } else { return size - Z()->avail_out; - } + } [[fallthrough]]; - } - - case Z_OK: { - const size_t processed = size - Z()->avail_out; - - if (processed) { - return processed; - } - - break; - } - - default: - ythrow TZLibDecompressorError() << "inflate error(" << GetErrMsg() << ")"; - } - } - } - -private: - inline bool FillInputBuffer() { - return Next(&Z()->next_in, &Z()->avail_in); - } + } + + case Z_OK: { + const size_t processed = size - Z()->avail_out; + + if (processed) { + return processed; + } + + break; + } + + default: + ythrow TZLibDecompressorError() << "inflate error(" << GetErrMsg() << ")"; + } + } + } + +private: + inline bool FillInputBuffer() { + return Next(&Z()->next_in, &Z()->avail_in); + } void SetDict() { if (inflateSetDictionary(Z(), (const Bytef*)Dict.data(), Dict.size()) != Z_OK) { @@ -159,55 +159,55 @@ private: bool AllowMultipleStreams_ = true; TStringBuf Dict; -}; - -namespace { +}; + +namespace { class TDecompressStream: public IZeroCopyInput, public TZLibDecompress::TImpl, public TAdditionalStorage<TDecompressStream> { - public: + public: inline TDecompressStream(IInputStream* input, ZLib::StreamType type, TStringBuf dict) : TZLibDecompress::TImpl(this, type, dict) - , Stream_(input) - { - } - + , Stream_(input) + { + } + ~TDecompressStream() override = default; - private: + private: size_t DoNext(const void** ptr, size_t len) override { - void* buf = AdditionalData(); - - *ptr = buf; + void* buf = AdditionalData(); + + *ptr = buf; return Stream_->Read(buf, Min(len, AdditionalDataLength())); - } - - private: + } + + private: IInputStream* Stream_; - }; - + }; + using TZeroCopyDecompress = TZLibDecompress::TImpl; -} - -class TZLibCompress::TImpl: public TAdditionalStorage<TImpl>, private TZLibCommon { +} + +class TZLibCompress::TImpl: public TAdditionalStorage<TImpl>, private TZLibCommon { static inline ZLib::StreamType Type(ZLib::StreamType type) { - if (type == ZLib::Auto) { - return ZLib::ZLib; - } - + if (type == ZLib::Auto) { + return ZLib::ZLib; + } + if (type >= ZLib::Invalid) { ythrow TZLibError() << "invalid compression type: " << static_cast<unsigned long>(type); } - return type; - } - -public: - inline TImpl(const TParams& p) - : Stream_(p.Out) - { + return type; + } + +public: + inline TImpl(const TParams& p) + : Stream_(p.Out) + { if (deflateInit2(Z(), Min<size_t>(9, p.CompressionLevel), Z_DEFLATED, opts[Type(p.Type)], 8, Z_DEFAULT_STRATEGY)) { - ythrow TZLibCompressorError() << "can not init inflate engine"; - } - + ythrow TZLibCompressorError() << "can not init inflate engine"; + } + // Create exactly the same files on all platforms by fixing OS field in the header. if (p.Type == ZLib::GZip) { GZHeader_ = MakeHolder<gz_header>(); @@ -217,56 +217,56 @@ public: if (p.Dict.size()) { if (deflateSetDictionary(Z(), (const Bytef*)p.Dict.data(), p.Dict.size())) { - ythrow TZLibCompressorError() << "can not set deflate dictionary"; - } - } - - Z()->next_out = TmpBuf(); - Z()->avail_out = TmpBufLen(); - } - + ythrow TZLibCompressorError() << "can not set deflate dictionary"; + } + } + + Z()->next_out = TmpBuf(); + Z()->avail_out = TmpBufLen(); + } + inline ~TImpl() { - deflateEnd(Z()); - } - - inline void Write(const void* buf, size_t size) { - const Bytef* b = (const Bytef*)buf; - const Bytef* e = b + size; - + deflateEnd(Z()); + } + + inline void Write(const void* buf, size_t size) { + const Bytef* b = (const Bytef*)buf; + const Bytef* e = b + size; + Y_DEFER { Z()->next_in = nullptr; Z()->avail_in = 0; }; - do { - b = WritePart(b, e); - } while (b < e); - } - - inline const Bytef* WritePart(const Bytef* b, const Bytef* e) { - Z()->next_in = const_cast<Bytef*>(b); - Z()->avail_in = MaxPortion(e - b); - - while (Z()->avail_in) { - const int ret = deflate(Z(), Z_NO_FLUSH); - - switch (ret) { - case Z_OK: - continue; - - case Z_BUF_ERROR: - FlushBuffer(); - - break; - - default: - ythrow TZLibCompressorError() << "deflate error(" << GetErrMsg() << ")"; - } - } - - return Z()->next_in; - } - - inline void Flush() { + do { + b = WritePart(b, e); + } while (b < e); + } + + inline const Bytef* WritePart(const Bytef* b, const Bytef* e) { + Z()->next_in = const_cast<Bytef*>(b); + Z()->avail_in = MaxPortion(e - b); + + while (Z()->avail_in) { + const int ret = deflate(Z(), Z_NO_FLUSH); + + switch (ret) { + case Z_OK: + continue; + + case Z_BUF_ERROR: + FlushBuffer(); + + break; + + default: + ythrow TZLibCompressorError() << "deflate error(" << GetErrMsg() << ")"; + } + } + + return Z()->next_in; + } + + inline void Flush() { int ret = deflate(Z(), Z_SYNC_FLUSH); while ((ret == Z_OK || ret == Z_BUF_ERROR) && !Z()->avail_out) { @@ -281,100 +281,100 @@ public: if (Z()->avail_out < TmpBufLen()) { FlushBuffer(); } - } - - inline void FlushBuffer() { - Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); - Z()->next_out = TmpBuf(); - Z()->avail_out = TmpBufLen(); - } - - inline void Finish() { - int ret = deflate(Z(), Z_FINISH); - - while (ret == Z_OK || ret == Z_BUF_ERROR) { - FlushBuffer(); - ret = deflate(Z(), Z_FINISH); - } - - if (ret == Z_STREAM_END) { - Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); - } else { + } + + inline void FlushBuffer() { + Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); + Z()->next_out = TmpBuf(); + Z()->avail_out = TmpBufLen(); + } + + inline void Finish() { + int ret = deflate(Z(), Z_FINISH); + + while (ret == Z_OK || ret == Z_BUF_ERROR) { + FlushBuffer(); + ret = deflate(Z(), Z_FINISH); + } + + if (ret == Z_STREAM_END) { + Stream_->Write(TmpBuf(), TmpBufLen() - Z()->avail_out); + } else { ythrow TZLibCompressorError() << "deflate finish error(" << GetErrMsg() << ")"; - } - } - -private: + } + } + +private: inline unsigned char* TmpBuf() noexcept { - return (unsigned char*)AdditionalData(); - } - + return (unsigned char*)AdditionalData(); + } + inline size_t TmpBufLen() const noexcept { - return AdditionalDataLength(); - } - -private: + return AdditionalDataLength(); + } + +private: IOutputStream* Stream_; THolder<gz_header> GZHeader_; -}; - +}; + TZLibDecompress::TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type, TStringBuf dict) : Impl_(new TZeroCopyDecompress(input, type, dict)) -{ -} - +{ +} + TZLibDecompress::TZLibDecompress(IInputStream* input, ZLib::StreamType type, size_t buflen, TStringBuf dict) : Impl_(new (buflen) TDecompressStream(input, type, dict)) -{ -} +{ +} void TZLibDecompress::SetAllowMultipleStreams(bool allowMultipleStreams) { Impl_->SetAllowMultipleStreams(allowMultipleStreams); } TZLibDecompress::~TZLibDecompress() = default; - -size_t TZLibDecompress::DoRead(void* buf, size_t size) { - return Impl_->Read(buf, MaxPortion(size)); -} - + +size_t TZLibDecompress::DoRead(void* buf, size_t size) { + return Impl_->Read(buf, MaxPortion(size)); +} + void TZLibCompress::Init(const TParams& params) { Y_ENSURE(params.BufLen >= 16, "ZLib buffer too small"); Impl_.Reset(new (params.BufLen) TImpl(params)); -} - -void TZLibCompress::TDestruct::Destroy(TImpl* impl) { - delete impl; -} - +} + +void TZLibCompress::TDestruct::Destroy(TImpl* impl) { + delete impl; +} + TZLibCompress::~TZLibCompress() { try { Finish(); } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } -} - -void TZLibCompress::DoWrite(const void* buf, size_t size) { +} + +void TZLibCompress::DoWrite(const void* buf, size_t size) { if (!Impl_) { - ythrow TZLibCompressorError() << "can not write to finished zlib stream"; + ythrow TZLibCompressorError() << "can not write to finished zlib stream"; } - Impl_->Write(buf, size); -} - -void TZLibCompress::DoFlush() { + Impl_->Write(buf, size); +} + +void TZLibCompress::DoFlush() { if (Impl_) { Impl_->Flush(); } -} - -void TZLibCompress::DoFinish() { +} + +void TZLibCompress::DoFinish() { THolder<TImpl> impl(Impl_.Release()); if (impl) { impl->Finish(); } -} - +} + TBufferedZLibDecompress::~TBufferedZLibDecompress() = default; diff --git a/util/stream/zlib.h b/util/stream/zlib.h index e7de7c81b7..dd70488b95 100644 --- a/util/stream/zlib.h +++ b/util/stream/zlib.h @@ -1,42 +1,42 @@ #pragma once - + #include "fwd.h" #include "input.h" #include "output.h" #include "buffered.h" - + #include <util/system/defaults.h> #include <util/generic/ptr.h> #include <util/generic/yexception.h> - + /** * @addtogroup Streams_Archs * @{ */ -struct TZLibError: public yexception { -}; - -struct TZLibCompressorError: public TZLibError { +struct TZLibError: public yexception { }; -struct TZLibDecompressorError: public TZLibError { -}; - -namespace ZLib { - enum StreamType: ui8 { - Auto = 0, /**< Auto detect format. Can be used for decompression only. */ - ZLib = 1, - GZip = 2, +struct TZLibCompressorError: public TZLibError { +}; + +struct TZLibDecompressorError: public TZLibError { +}; + +namespace ZLib { + enum StreamType: ui8 { + Auto = 0, /**< Auto detect format. Can be used for decompression only. */ + ZLib = 1, + GZip = 2, Raw = 3, Invalid = 4 - }; + }; enum { ZLIB_BUF_LEN = 8 * 1024 }; -} - +} + /** * Non-buffered ZLib decompressing stream. * @@ -46,11 +46,11 @@ namespace ZLib { * aka `TZDecompress`. */ class TZLibDecompress: public IInputStream { -public: +public: TZLibDecompress(IZeroCopyInput* input, ZLib::StreamType type = ZLib::Auto, TStringBuf dict = {}); TZLibDecompress(IInputStream* input, ZLib::StreamType type = ZLib::Auto, size_t buflen = ZLib::ZLIB_BUF_LEN, TStringBuf dict = {}); - + /** * Allows/disallows multiple sequential compressed streams. Allowed by default. * @@ -67,92 +67,92 @@ public: protected: size_t DoRead(void* buf, size_t size) override; -public: - class TImpl; - THolder<TImpl> Impl_; -}; - +public: + class TImpl; + THolder<TImpl> Impl_; +}; + /** * Non-buffered ZLib compressing stream. */ class TZLibCompress: public IOutputStream { -public: - struct TParams { +public: + struct TParams { inline TParams(IOutputStream* out) - : Out(out) - , Type(ZLib::ZLib) - , CompressionLevel(6) + : Out(out) + , Type(ZLib::ZLib) + , CompressionLevel(6) , BufLen(ZLib::ZLIB_BUF_LEN) - { - } - + { + } + inline TParams& SetType(ZLib::StreamType type) noexcept { - Type = type; - - return *this; - } - + Type = type; + + return *this; + } + inline TParams& SetCompressionLevel(size_t level) noexcept { - CompressionLevel = level; - - return *this; - } - + CompressionLevel = level; + + return *this; + } + inline TParams& SetBufLen(size_t buflen) noexcept { - BufLen = buflen; - - return *this; - } - + BufLen = buflen; + + return *this; + } + inline TParams& SetDict(const TStringBuf dict) noexcept { - Dict = dict; - - return *this; - } - + Dict = dict; + + return *this; + } + IOutputStream* Out; - ZLib::StreamType Type; - size_t CompressionLevel; - size_t BufLen; - TStringBuf Dict; - }; - - inline TZLibCompress(const TParams& params) { - Init(params); - } - + ZLib::StreamType Type; + size_t CompressionLevel; + size_t BufLen; + TStringBuf Dict; + }; + + inline TZLibCompress(const TParams& params) { + Init(params); + } + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type) { - Init(TParams(out).SetType(type)); - } - + Init(TParams(out).SetType(type)); + } + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level) { - Init(TParams(out).SetType(type).SetCompressionLevel(compression_level)); - } - + Init(TParams(out).SetType(type).SetCompressionLevel(compression_level)); + } + inline TZLibCompress(IOutputStream* out, ZLib::StreamType type, size_t compression_level, size_t buflen) { - Init(TParams(out).SetType(type).SetCompressionLevel(compression_level).SetBufLen(buflen)); - } - + Init(TParams(out).SetType(type).SetCompressionLevel(compression_level).SetBufLen(buflen)); + } + ~TZLibCompress() override; - -private: - void Init(const TParams& opts); - + +private: + void Init(const TParams& opts); + void DoWrite(const void* buf, size_t size) override; void DoFlush() override; void DoFinish() override; - -public: - class TImpl; - + +public: + class TImpl; + /** To allow inline constructors. */ - struct TDestruct { - static void Destroy(TImpl* impl); - }; - - THolder<TImpl, TDestruct> Impl_; -}; - + struct TDestruct { + static void Destroy(TImpl* impl); + }; + + THolder<TImpl, TDestruct> Impl_; +}; + /** * Buffered ZLib decompressing stream. * @@ -160,12 +160,12 @@ public: * usage patterns. */ class TBufferedZLibDecompress: public TBuffered<TZLibDecompress> { -public: - template <class T> - inline TBufferedZLibDecompress(T* in, ZLib::StreamType type = ZLib::Auto, size_t buf = 1 << 13) - : TBuffered<TZLibDecompress>(buf, in, type) - { - } +public: + template <class T> + inline TBufferedZLibDecompress(T* in, ZLib::StreamType type = ZLib::Auto, size_t buf = 1 << 13) + : TBuffered<TZLibDecompress>(buf, in, type) + { + } ~TBufferedZLibDecompress() override; }; diff --git a/util/stream/zlib_ut.cpp b/util/stream/zlib_ut.cpp index 2290b4a9de..39e33035f4 100644 --- a/util/stream/zlib_ut.cpp +++ b/util/stream/zlib_ut.cpp @@ -1,14 +1,14 @@ -#include "zlib.h" - +#include "zlib.h" + #include <library/cpp/testing/unittest/registar.h> - -#include "file.h" -#include <util/system/tempfile.h> + +#include "file.h" +#include <util/system/tempfile.h> #include <util/random/entropy.h> #include <util/random/random.h> - -#define ZDATA "./zdata" - + +#define ZDATA "./zdata" + class TThrowingStream: public IOutputStream { public: TThrowingStream(int limit) @@ -54,25 +54,25 @@ private: Y_UNIT_TEST_SUITE(TZLibTest) { static const TString DATA = "8s7d5vc6s5vc67sa4c65ascx6asd4xcv76adsfxv76s"; static const TString DATA2 = "cn8wk2bd9vb3vdfif83g1ks94bfiovtwv"; - + Y_UNIT_TEST(Compress) { TUnbufferedFileOutput o(ZDATA); - TZLibCompress c(&o, ZLib::ZLib); - + TZLibCompress c(&o, ZLib::ZLib); + c.Write(DATA.data(), DATA.size()); - c.Finish(); - o.Finish(); - } - + c.Finish(); + o.Finish(); + } + Y_UNIT_TEST(Decompress) { - TTempFile tmpFile(ZDATA); - - { + TTempFile tmpFile(ZDATA); + + { TUnbufferedFileInput i(ZDATA); - TZLibDecompress d(&i); - + TZLibDecompress d(&i); + UNIT_ASSERT_EQUAL(d.ReadAll(), DATA); - } + } } Y_UNIT_TEST(Dictionary) { @@ -227,4 +227,4 @@ Y_UNIT_TEST_SUITE(TZLibTest) { } } } -} +} |