summaryrefslogtreecommitdiffstats
path: root/util/stream/buffered.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/stream/buffered.cpp
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/stream/buffered.cpp')
-rw-r--r--util/stream/buffered.cpp428
1 files changed, 428 insertions, 0 deletions
diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp
new file mode 100644
index 00000000000..a00e592e1c4
--- /dev/null
+++ b/util/stream/buffered.cpp
@@ -0,0 +1,428 @@
+#include "mem.h"
+#include "buffered.h"
+
+#include <util/memory/addstorage.h>
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
+
+class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> {
+public:
+ inline TImpl(IInputStream* slave)
+ : Slave_(slave)
+ , MemInput_(nullptr, 0)
+ {
+ }
+
+ inline ~TImpl() = default;
+
+ inline size_t Next(const void** ptr, size_t len) {
+ if (MemInput_.Exhausted()) {
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Next(ptr, len);
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ if (MemInput_.Exhausted()) {
+ if (len > BufLen() / 2) {
+ return Slave_->Read(buf, len);
+ }
+
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Read(buf, len);
+ }
+
+ inline size_t Skip(size_t len) {
+ size_t totalSkipped = 0;
+ while (len) {
+ const size_t skipped = DoSkip(len);
+ if (skipped == 0) {
+ break;
+ }
+
+ totalSkipped += skipped;
+ len -= skipped;
+ }
+
+ return totalSkipped;
+ }
+
+ inline size_t DoSkip(size_t len) {
+ if (MemInput_.Exhausted()) {
+ if (len > BufLen() / 2) {
+ return Slave_->Skip(len);
+ }
+
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Skip(len);
+ }
+
+ inline size_t ReadTo(TString& st, char to) {
+ st.clear();
+
+ TString s_tmp;
+
+ size_t ret = 0;
+
+ while (true) {
+ if (MemInput_.Exhausted()) {
+ const size_t bytesRead = Slave_->Read(Buf(), BufLen());
+
+ if (!bytesRead) {
+ break;
+ }
+
+ MemInput_.Reset(Buf(), bytesRead);
+ }
+
+ const size_t a_len(MemInput_.Avail());
+ size_t s_len = 0;
+ if (st.empty()) {
+ ret += MemInput_.ReadTo(st, to);
+ s_len = st.length();
+ } else {
+ ret += MemInput_.ReadTo(s_tmp, to);
+ s_len = s_tmp.length();
+ st.append(s_tmp);
+ }
+
+ if (s_len != a_len) {
+ break;
+ }
+ }
+
+ return ret;
+ }
+
+ inline void Reset(IInputStream* slave) {
+ Slave_ = slave;
+ }
+
+private:
+ inline size_t BufLen() const noexcept {
+ return AdditionalDataLength();
+ }
+
+ inline void* Buf() const noexcept {
+ return AdditionalData();
+ }
+
+private:
+ IInputStream* Slave_;
+ TMemoryInput MemInput_;
+};
+
+TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen)
+ : Impl_(new (buflen) TImpl(slave))
+{
+}
+
+TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default;
+TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default;
+
+TBufferedInput::~TBufferedInput() = default;
+
+size_t TBufferedInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
+size_t TBufferedInput::DoSkip(size_t len) {
+ return Impl_->Skip(len);
+}
+
+size_t TBufferedInput::DoNext(const void** ptr, size_t len) {
+ return Impl_->Next(ptr, len);
+}
+
+size_t TBufferedInput::DoReadTo(TString& st, char ch) {
+ return Impl_->ReadTo(st, ch);
+}
+
+void TBufferedInput::Reset(IInputStream* slave) {
+ Impl_->Reset(slave);
+}
+
+class TBufferedOutputBase::TImpl {
+public:
+ inline TImpl(IOutputStream* slave)
+ : Slave_(slave)
+ , MemOut_(nullptr, 0)
+ , PropagateFlush_(false)
+ , PropagateFinish_(false)
+ {
+ }
+
+ virtual ~TImpl() = default;
+
+ inline void Reset() {
+ MemOut_.Reset(Buf(), Len());
+ }
+
+ inline size_t Next(void** ptr) {
+ if (MemOut_.Avail() == 0) {
+ Slave_->Write(Buf(), Stored());
+ OnBufferExhausted();
+ Reset();
+ }
+
+ return MemOut_.Next(ptr);
+ }
+
+ inline void Undo(size_t len) {
+ Y_VERIFY(len <= Stored(), "trying to undo more bytes than actually written");
+ MemOut_.Undo(len);
+ }
+
+ inline void Write(const void* buf, size_t len) {
+ if (len <= MemOut_.Avail()) {
+ /*
+ * fast path
+ */
+
+ MemOut_.Write(buf, len);
+ } else {
+ const size_t stored = Stored();
+ const size_t full_len = stored + len;
+ const size_t good_len = DownToBufferGranularity(full_len);
+ const size_t write_from_buf = good_len - stored;
+
+ using TPart = IOutputStream::TPart;
+
+ alignas(TPart) char data[2 * sizeof(TPart)];
+ TPart* parts = reinterpret_cast<TPart*>(data);
+ TPart* end = parts;
+
+ if (stored) {
+ new (end++) TPart(Buf(), stored);
+ }
+
+ if (write_from_buf) {
+ new (end++) TPart(buf, write_from_buf);
+ }
+
+ Slave_->Write(parts, end - parts);
+
+ //grow buffer only on full flushes
+ OnBufferExhausted();
+ Reset();
+
+ if (write_from_buf < len) {
+ MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf);
+ }
+ }
+ }
+
+ inline void Write(char c) {
+ if (Y_UNLIKELY(MemOut_.Avail() == 0)) {
+ Slave_->Write(Buf(), Stored());
+ OnBufferExhausted();
+ Reset();
+ }
+
+ MemOut_.Write(c);
+ }
+
+ inline void SetFlushPropagateMode(bool mode) noexcept {
+ PropagateFlush_ = mode;
+ }
+
+ inline void SetFinishPropagateMode(bool mode) noexcept {
+ PropagateFinish_ = mode;
+ }
+
+ inline void Flush() {
+ {
+ Slave_->Write(Buf(), Stored());
+ Reset();
+ }
+
+ if (PropagateFlush_) {
+ Slave_->Flush();
+ }
+ }
+
+ inline void Finish() {
+ try {
+ Flush();
+ } catch (...) {
+ try {
+ DoFinish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ throw;
+ }
+
+ DoFinish();
+ }
+
+private:
+ inline void DoFinish() {
+ if (PropagateFinish_) {
+ Slave_->Finish();
+ }
+ }
+
+ inline size_t Stored() const noexcept {
+ return Len() - MemOut_.Avail();
+ }
+
+ inline size_t DownToBufferGranularity(size_t l) const noexcept {
+ return l - (l % Len());
+ }
+
+ virtual void OnBufferExhausted() = 0;
+ virtual void* Buf() const noexcept = 0;
+ virtual size_t Len() const noexcept = 0;
+
+private:
+ IOutputStream* Slave_;
+ TMemoryOutput MemOut_;
+ bool PropagateFlush_;
+ bool PropagateFinish_;
+};
+
+namespace {
+ struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> {
+ inline TSimpleImpl(IOutputStream* slave)
+ : TBufferedOutputBase::TImpl(slave)
+ {
+ Reset();
+ }
+
+ ~TSimpleImpl() override = default;
+
+ void OnBufferExhausted() final {
+ }
+
+ void* Buf() const noexcept override {
+ return AdditionalData();
+ }
+
+ size_t Len() const noexcept override {
+ return AdditionalDataLength();
+ }
+ };
+
+ struct TAdaptiveImpl: public TBufferedOutputBase::TImpl {
+ enum {
+ Step = 4096
+ };
+
+ inline TAdaptiveImpl(IOutputStream* slave)
+ : TBufferedOutputBase::TImpl(slave)
+ , N_(0)
+ {
+ B_.Reserve(Step);
+ Reset();
+ }
+
+ ~TAdaptiveImpl() override = default;
+
+ void OnBufferExhausted() final {
+ const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10);
+
+ if (c > B_.Capacity()) {
+ TBuffer(c).Swap(B_);
+ }
+ }
+
+ void* Buf() const noexcept override {
+ return (void*)B_.Data();
+ }
+
+ size_t Len() const noexcept override {
+ return B_.Capacity();
+ }
+
+ TBuffer B_;
+ ui64 N_;
+ };
+}
+
+TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave)
+ : Impl_(new TAdaptiveImpl(slave))
+{
+}
+
+TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen)
+ : Impl_(new (buflen) TSimpleImpl(slave))
+{
+}
+
+TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default;
+TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default;
+
+TBufferedOutputBase::~TBufferedOutputBase() {
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
+size_t TBufferedOutputBase::DoNext(void** ptr) {
+ Y_ENSURE(Impl_.Get(), "cannot call next in finished stream");
+ return Impl_->Next(ptr);
+}
+
+void TBufferedOutputBase::DoUndo(size_t len) {
+ Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream");
+ Impl_->Undo(len);
+}
+
+void TBufferedOutputBase::DoWrite(const void* data, size_t len) {
+ Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
+ Impl_->Write(data, len);
+}
+
+void TBufferedOutputBase::DoWriteC(char c) {
+ Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
+ Impl_->Write(c);
+}
+
+void TBufferedOutputBase::DoFlush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TBufferedOutputBase::DoFinish() {
+ THolder<TImpl> impl(Impl_.Release());
+
+ if (impl) {
+ impl->Finish();
+ }
+}
+
+void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept {
+ if (Impl_.Get()) {
+ Impl_->SetFlushPropagateMode(propagate);
+ }
+}
+
+void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept {
+ if (Impl_.Get()) {
+ Impl_->SetFinishPropagateMode(propagate);
+ }
+}
+
+TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen)
+ : TBufferedOutputBase(slave, buflen)
+{
+}
+
+TBufferedOutput::~TBufferedOutput() = default;
+
+TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave)
+ : TBufferedOutputBase(slave)
+{
+}
+
+TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default;