diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/stream/buffered.cpp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/stream/buffered.cpp')
-rw-r--r-- | util/stream/buffered.cpp | 560 |
1 files changed, 280 insertions, 280 deletions
diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp index a00e592e1c..471944954c 100644 --- a/util/stream/buffered.cpp +++ b/util/stream/buffered.cpp @@ -1,40 +1,40 @@ -#include "mem.h" -#include "buffered.h" - -#include <util/memory/addstorage.h> -#include <util/generic/yexception.h> -#include <util/generic/buffer.h> - -class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> { -public: +#include "mem.h" +#include "buffered.h" + +#include <util/memory/addstorage.h> +#include <util/generic/yexception.h> +#include <util/generic/buffer.h> + +class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> { +public: inline TImpl(IInputStream* slave) - : Slave_(slave) + : Slave_(slave) , MemInput_(nullptr, 0) - { - } - + { + } + inline ~TImpl() = default; - + inline size_t Next(const void** ptr, size_t len) { - if (MemInput_.Exhausted()) { - MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); - } - + if (MemInput_.Exhausted()) { + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + return MemInput_.Next(ptr, len); - } - - inline size_t Read(void* buf, size_t len) { - if (MemInput_.Exhausted()) { - if (len > BufLen() / 2) { - return Slave_->Read(buf, len); - } - - MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); - } - - return MemInput_.Read(buf, len); - } - + } + + inline size_t Read(void* buf, size_t len) { + if (MemInput_.Exhausted()) { + if (len > BufLen() / 2) { + return Slave_->Read(buf, len); + } + + MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen())); + } + + return MemInput_.Read(buf, len); + } + inline size_t Skip(size_t len) { size_t totalSkipped = 0; while (len) { @@ -66,103 +66,103 @@ public: st.clear(); TString s_tmp; - + size_t ret = 0; - - while (true) { - if (MemInput_.Exhausted()) { + + while (true) { + if (MemInput_.Exhausted()) { const size_t bytesRead = Slave_->Read(Buf(), BufLen()); - + if (!bytesRead) { - break; - } - + break; + } + MemInput_.Reset(Buf(), bytesRead); - } - - const size_t a_len(MemInput_.Avail()); + } + + const size_t a_len(MemInput_.Avail()); size_t s_len = 0; if (st.empty()) { ret += MemInput_.ReadTo(st, to); s_len = st.length(); - } else { + } else { ret += MemInput_.ReadTo(s_tmp, to); s_len = s_tmp.length(); st.append(s_tmp); - } - - if (s_len != a_len) { - break; - } - } - - return ret; - } - + } + + if (s_len != a_len) { + break; + } + } + + return ret; + } + inline void Reset(IInputStream* slave) { - Slave_ = slave; - } - -private: + Slave_ = slave; + } + +private: inline size_t BufLen() const noexcept { - return AdditionalDataLength(); - } - + return AdditionalDataLength(); + } + inline void* Buf() const noexcept { - return AdditionalData(); - } - -private: + return AdditionalData(); + } + +private: IInputStream* Slave_; - TMemoryInput MemInput_; -}; - + TMemoryInput MemInput_; +}; + TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen) - : Impl_(new (buflen) TImpl(slave)) -{ -} - + : Impl_(new (buflen) TImpl(slave)) +{ +} + TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default; TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default; TBufferedInput::~TBufferedInput() = default; - -size_t TBufferedInput::DoRead(void* buf, size_t len) { - return Impl_->Read(buf, len); -} - + +size_t TBufferedInput::DoRead(void* buf, size_t len) { + return Impl_->Read(buf, len); +} + size_t TBufferedInput::DoSkip(size_t len) { return Impl_->Skip(len); } size_t TBufferedInput::DoNext(const void** ptr, size_t len) { - return Impl_->Next(ptr, len); -} - + return Impl_->Next(ptr, len); +} + size_t TBufferedInput::DoReadTo(TString& st, char ch) { - return Impl_->ReadTo(st, ch); -} - + return Impl_->ReadTo(st, ch); +} + void TBufferedInput::Reset(IInputStream* slave) { - Impl_->Reset(slave); -} - -class TBufferedOutputBase::TImpl { -public: + Impl_->Reset(slave); +} + +class TBufferedOutputBase::TImpl { +public: inline TImpl(IOutputStream* slave) - : Slave_(slave) - , MemOut_(nullptr, 0) - , PropagateFlush_(false) - , PropagateFinish_(false) - { - } - + : Slave_(slave) + , MemOut_(nullptr, 0) + , PropagateFlush_(false) + , PropagateFinish_(false) + { + } + virtual ~TImpl() = default; - - inline void Reset() { - MemOut_.Reset(Buf(), Len()); - } - + + inline void Reset() { + MemOut_.Reset(Buf(), Len()); + } + inline size_t Next(void** ptr) { if (MemOut_.Avail() == 0) { Slave_->Write(Buf(), Stored()); @@ -178,45 +178,45 @@ public: MemOut_.Undo(len); } - inline void Write(const void* buf, size_t len) { - if (len <= MemOut_.Avail()) { - /* - * fast path - */ - - MemOut_.Write(buf, len); - } else { - const size_t stored = Stored(); - const size_t full_len = stored + len; - const size_t good_len = DownToBufferGranularity(full_len); - const size_t write_from_buf = good_len - stored; - + inline void Write(const void* buf, size_t len) { + if (len <= MemOut_.Avail()) { + /* + * fast path + */ + + MemOut_.Write(buf, len); + } else { + const size_t stored = Stored(); + const size_t full_len = stored + len; + const size_t good_len = DownToBufferGranularity(full_len); + const size_t write_from_buf = good_len - stored; + using TPart = IOutputStream::TPart; - + alignas(TPart) char data[2 * sizeof(TPart)]; TPart* parts = reinterpret_cast<TPart*>(data); - TPart* end = parts; - - if (stored) { + TPart* end = parts; + + if (stored) { new (end++) TPart(Buf(), stored); - } - - if (write_from_buf) { + } + + if (write_from_buf) { new (end++) TPart(buf, write_from_buf); - } - - Slave_->Write(parts, end - parts); - - //grow buffer only on full flushes - OnBufferExhausted(); - Reset(); - - if (write_from_buf < len) { - MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf); - } - } - } - + } + + Slave_->Write(parts, end - parts); + + //grow buffer only on full flushes + OnBufferExhausted(); + Reset(); + + if (write_from_buf < len) { + MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf); + } + } + } + inline void Write(char c) { if (Y_UNLIKELY(MemOut_.Avail() == 0)) { Slave_->Write(Buf(), Stored()); @@ -228,145 +228,145 @@ public: } inline void SetFlushPropagateMode(bool mode) noexcept { - PropagateFlush_ = mode; - } - + PropagateFlush_ = mode; + } + inline void SetFinishPropagateMode(bool mode) noexcept { - PropagateFinish_ = mode; - } - - inline void Flush() { - { - Slave_->Write(Buf(), Stored()); - Reset(); - } - - if (PropagateFlush_) { - Slave_->Flush(); - } - } - - inline void Finish() { - try { - Flush(); - } catch (...) { + PropagateFinish_ = mode; + } + + inline void Flush() { + { + Slave_->Write(Buf(), Stored()); + Reset(); + } + + if (PropagateFlush_) { + Slave_->Flush(); + } + } + + inline void Finish() { + try { + Flush(); + } catch (...) { try { - DoFinish(); + DoFinish(); } catch (...) { - // ¯\_(ツ)_/¯ + // ¯\_(ツ)_/¯ } - throw; + throw; } - DoFinish(); - } - -private: - inline void DoFinish() { - if (PropagateFinish_) { - Slave_->Finish(); - } - } - + DoFinish(); + } + +private: + inline void DoFinish() { + if (PropagateFinish_) { + Slave_->Finish(); + } + } + inline size_t Stored() const noexcept { - return Len() - MemOut_.Avail(); - } - + return Len() - MemOut_.Avail(); + } + inline size_t DownToBufferGranularity(size_t l) const noexcept { - return l - (l % Len()); - } - - virtual void OnBufferExhausted() = 0; + return l - (l % Len()); + } + + virtual void OnBufferExhausted() = 0; virtual void* Buf() const noexcept = 0; virtual size_t Len() const noexcept = 0; - -private: + +private: IOutputStream* Slave_; - TMemoryOutput MemOut_; - bool PropagateFlush_; - bool PropagateFinish_; -}; - -namespace { - struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> { + TMemoryOutput MemOut_; + bool PropagateFlush_; + bool PropagateFinish_; +}; + +namespace { + struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> { inline TSimpleImpl(IOutputStream* slave) - : TBufferedOutputBase::TImpl(slave) - { - Reset(); - } - + : TBufferedOutputBase::TImpl(slave) + { + Reset(); + } + ~TSimpleImpl() override = default; - + void OnBufferExhausted() final { - } - + } + void* Buf() const noexcept override { - return AdditionalData(); - } - + return AdditionalData(); + } + size_t Len() const noexcept override { - return AdditionalDataLength(); - } - }; - - struct TAdaptiveImpl: public TBufferedOutputBase::TImpl { - enum { - Step = 4096 - }; - + return AdditionalDataLength(); + } + }; + + struct TAdaptiveImpl: public TBufferedOutputBase::TImpl { + enum { + Step = 4096 + }; + inline TAdaptiveImpl(IOutputStream* slave) - : TBufferedOutputBase::TImpl(slave) - , N_(0) - { - B_.Reserve(Step); - Reset(); - } - + : TBufferedOutputBase::TImpl(slave) + , N_(0) + { + B_.Reserve(Step); + Reset(); + } + ~TAdaptiveImpl() override = default; - + void OnBufferExhausted() final { - const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10); - - if (c > B_.Capacity()) { - TBuffer(c).Swap(B_); - } - } - + const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10); + + if (c > B_.Capacity()) { + TBuffer(c).Swap(B_); + } + } + void* Buf() const noexcept override { - return (void*)B_.Data(); - } - + return (void*)B_.Data(); + } + size_t Len() const noexcept override { - return B_.Capacity(); - } - - TBuffer B_; - ui64 N_; - }; -} - + return B_.Capacity(); + } + + TBuffer B_; + ui64 N_; + }; +} + TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave) - : Impl_(new TAdaptiveImpl(slave)) -{ -} - + : Impl_(new TAdaptiveImpl(slave)) +{ +} + TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen) - : Impl_(new (buflen) TSimpleImpl(slave)) -{ -} - + : Impl_(new (buflen) TSimpleImpl(slave)) +{ +} + TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default; TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default; TBufferedOutputBase::~TBufferedOutputBase() { - try { - Finish(); - } catch (...) { - // ¯\_(ツ)_/¯ - } -} - + try { + Finish(); + } catch (...) { + // ¯\_(ツ)_/¯ + } +} + size_t TBufferedOutputBase::DoNext(void** ptr) { Y_ENSURE(Impl_.Get(), "cannot call next in finished stream"); return Impl_->Next(ptr); @@ -377,52 +377,52 @@ void TBufferedOutputBase::DoUndo(size_t len) { Impl_->Undo(len); } -void TBufferedOutputBase::DoWrite(const void* data, size_t len) { +void TBufferedOutputBase::DoWrite(const void* data, size_t len) { Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); Impl_->Write(data, len); -} - +} + void TBufferedOutputBase::DoWriteC(char c) { Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); Impl_->Write(c); } -void TBufferedOutputBase::DoFlush() { - if (Impl_.Get()) { - Impl_->Flush(); - } -} - -void TBufferedOutputBase::DoFinish() { - THolder<TImpl> impl(Impl_.Release()); - +void TBufferedOutputBase::DoFlush() { + if (Impl_.Get()) { + Impl_->Flush(); + } +} + +void TBufferedOutputBase::DoFinish() { + THolder<TImpl> impl(Impl_.Release()); + if (impl) { - impl->Finish(); - } -} - + impl->Finish(); + } +} + void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept { - if (Impl_.Get()) { - Impl_->SetFlushPropagateMode(propagate); - } -} - + if (Impl_.Get()) { + Impl_->SetFlushPropagateMode(propagate); + } +} + void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept { - if (Impl_.Get()) { - Impl_->SetFinishPropagateMode(propagate); - } -} - + if (Impl_.Get()) { + Impl_->SetFinishPropagateMode(propagate); + } +} + TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen) - : TBufferedOutputBase(slave, buflen) -{ -} - + : TBufferedOutputBase(slave, buflen) +{ +} + TBufferedOutput::~TBufferedOutput() = default; - + TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave) - : TBufferedOutputBase(slave) -{ -} - + : TBufferedOutputBase(slave) +{ +} + TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default; |