aboutsummaryrefslogtreecommitdiffstats
path: root/util/stream/buffered.cpp
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/stream/buffered.cpp
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/stream/buffered.cpp')
-rw-r--r--util/stream/buffered.cpp560
1 files changed, 280 insertions, 280 deletions
diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp
index a00e592e1c..471944954c 100644
--- a/util/stream/buffered.cpp
+++ b/util/stream/buffered.cpp
@@ -1,40 +1,40 @@
-#include "mem.h"
-#include "buffered.h"
-
-#include <util/memory/addstorage.h>
-#include <util/generic/yexception.h>
-#include <util/generic/buffer.h>
-
-class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> {
-public:
+#include "mem.h"
+#include "buffered.h"
+
+#include <util/memory/addstorage.h>
+#include <util/generic/yexception.h>
+#include <util/generic/buffer.h>
+
+class TBufferedInput::TImpl: public TAdditionalStorage<TImpl> {
+public:
inline TImpl(IInputStream* slave)
- : Slave_(slave)
+ : Slave_(slave)
, MemInput_(nullptr, 0)
- {
- }
-
+ {
+ }
+
inline ~TImpl() = default;
-
+
inline size_t Next(const void** ptr, size_t len) {
- if (MemInput_.Exhausted()) {
- MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
- }
-
+ if (MemInput_.Exhausted()) {
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
return MemInput_.Next(ptr, len);
- }
-
- inline size_t Read(void* buf, size_t len) {
- if (MemInput_.Exhausted()) {
- if (len > BufLen() / 2) {
- return Slave_->Read(buf, len);
- }
-
- MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
- }
-
- return MemInput_.Read(buf, len);
- }
-
+ }
+
+ inline size_t Read(void* buf, size_t len) {
+ if (MemInput_.Exhausted()) {
+ if (len > BufLen() / 2) {
+ return Slave_->Read(buf, len);
+ }
+
+ MemInput_.Reset(Buf(), Slave_->Read(Buf(), BufLen()));
+ }
+
+ return MemInput_.Read(buf, len);
+ }
+
inline size_t Skip(size_t len) {
size_t totalSkipped = 0;
while (len) {
@@ -66,103 +66,103 @@ public:
st.clear();
TString s_tmp;
-
+
size_t ret = 0;
-
- while (true) {
- if (MemInput_.Exhausted()) {
+
+ while (true) {
+ if (MemInput_.Exhausted()) {
const size_t bytesRead = Slave_->Read(Buf(), BufLen());
-
+
if (!bytesRead) {
- break;
- }
-
+ break;
+ }
+
MemInput_.Reset(Buf(), bytesRead);
- }
-
- const size_t a_len(MemInput_.Avail());
+ }
+
+ const size_t a_len(MemInput_.Avail());
size_t s_len = 0;
if (st.empty()) {
ret += MemInput_.ReadTo(st, to);
s_len = st.length();
- } else {
+ } else {
ret += MemInput_.ReadTo(s_tmp, to);
s_len = s_tmp.length();
st.append(s_tmp);
- }
-
- if (s_len != a_len) {
- break;
- }
- }
-
- return ret;
- }
-
+ }
+
+ if (s_len != a_len) {
+ break;
+ }
+ }
+
+ return ret;
+ }
+
inline void Reset(IInputStream* slave) {
- Slave_ = slave;
- }
-
-private:
+ Slave_ = slave;
+ }
+
+private:
inline size_t BufLen() const noexcept {
- return AdditionalDataLength();
- }
-
+ return AdditionalDataLength();
+ }
+
inline void* Buf() const noexcept {
- return AdditionalData();
- }
-
-private:
+ return AdditionalData();
+ }
+
+private:
IInputStream* Slave_;
- TMemoryInput MemInput_;
-};
-
+ TMemoryInput MemInput_;
+};
+
TBufferedInput::TBufferedInput(IInputStream* slave, size_t buflen)
- : Impl_(new (buflen) TImpl(slave))
-{
-}
-
+ : Impl_(new (buflen) TImpl(slave))
+{
+}
+
TBufferedInput::TBufferedInput(TBufferedInput&&) noexcept = default;
TBufferedInput& TBufferedInput::operator=(TBufferedInput&&) noexcept = default;
TBufferedInput::~TBufferedInput() = default;
-
-size_t TBufferedInput::DoRead(void* buf, size_t len) {
- return Impl_->Read(buf, len);
-}
-
+
+size_t TBufferedInput::DoRead(void* buf, size_t len) {
+ return Impl_->Read(buf, len);
+}
+
size_t TBufferedInput::DoSkip(size_t len) {
return Impl_->Skip(len);
}
size_t TBufferedInput::DoNext(const void** ptr, size_t len) {
- return Impl_->Next(ptr, len);
-}
-
+ return Impl_->Next(ptr, len);
+}
+
size_t TBufferedInput::DoReadTo(TString& st, char ch) {
- return Impl_->ReadTo(st, ch);
-}
-
+ return Impl_->ReadTo(st, ch);
+}
+
void TBufferedInput::Reset(IInputStream* slave) {
- Impl_->Reset(slave);
-}
-
-class TBufferedOutputBase::TImpl {
-public:
+ Impl_->Reset(slave);
+}
+
+class TBufferedOutputBase::TImpl {
+public:
inline TImpl(IOutputStream* slave)
- : Slave_(slave)
- , MemOut_(nullptr, 0)
- , PropagateFlush_(false)
- , PropagateFinish_(false)
- {
- }
-
+ : Slave_(slave)
+ , MemOut_(nullptr, 0)
+ , PropagateFlush_(false)
+ , PropagateFinish_(false)
+ {
+ }
+
virtual ~TImpl() = default;
-
- inline void Reset() {
- MemOut_.Reset(Buf(), Len());
- }
-
+
+ inline void Reset() {
+ MemOut_.Reset(Buf(), Len());
+ }
+
inline size_t Next(void** ptr) {
if (MemOut_.Avail() == 0) {
Slave_->Write(Buf(), Stored());
@@ -178,45 +178,45 @@ public:
MemOut_.Undo(len);
}
- inline void Write(const void* buf, size_t len) {
- if (len <= MemOut_.Avail()) {
- /*
- * fast path
- */
-
- MemOut_.Write(buf, len);
- } else {
- const size_t stored = Stored();
- const size_t full_len = stored + len;
- const size_t good_len = DownToBufferGranularity(full_len);
- const size_t write_from_buf = good_len - stored;
-
+ inline void Write(const void* buf, size_t len) {
+ if (len <= MemOut_.Avail()) {
+ /*
+ * fast path
+ */
+
+ MemOut_.Write(buf, len);
+ } else {
+ const size_t stored = Stored();
+ const size_t full_len = stored + len;
+ const size_t good_len = DownToBufferGranularity(full_len);
+ const size_t write_from_buf = good_len - stored;
+
using TPart = IOutputStream::TPart;
-
+
alignas(TPart) char data[2 * sizeof(TPart)];
TPart* parts = reinterpret_cast<TPart*>(data);
- TPart* end = parts;
-
- if (stored) {
+ TPart* end = parts;
+
+ if (stored) {
new (end++) TPart(Buf(), stored);
- }
-
- if (write_from_buf) {
+ }
+
+ if (write_from_buf) {
new (end++) TPart(buf, write_from_buf);
- }
-
- Slave_->Write(parts, end - parts);
-
- //grow buffer only on full flushes
- OnBufferExhausted();
- Reset();
-
- if (write_from_buf < len) {
- MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf);
- }
- }
- }
-
+ }
+
+ Slave_->Write(parts, end - parts);
+
+ //grow buffer only on full flushes
+ OnBufferExhausted();
+ Reset();
+
+ if (write_from_buf < len) {
+ MemOut_.Write((const char*)buf + write_from_buf, len - write_from_buf);
+ }
+ }
+ }
+
inline void Write(char c) {
if (Y_UNLIKELY(MemOut_.Avail() == 0)) {
Slave_->Write(Buf(), Stored());
@@ -228,145 +228,145 @@ public:
}
inline void SetFlushPropagateMode(bool mode) noexcept {
- PropagateFlush_ = mode;
- }
-
+ PropagateFlush_ = mode;
+ }
+
inline void SetFinishPropagateMode(bool mode) noexcept {
- PropagateFinish_ = mode;
- }
-
- inline void Flush() {
- {
- Slave_->Write(Buf(), Stored());
- Reset();
- }
-
- if (PropagateFlush_) {
- Slave_->Flush();
- }
- }
-
- inline void Finish() {
- try {
- Flush();
- } catch (...) {
+ PropagateFinish_ = mode;
+ }
+
+ inline void Flush() {
+ {
+ Slave_->Write(Buf(), Stored());
+ Reset();
+ }
+
+ if (PropagateFlush_) {
+ Slave_->Flush();
+ }
+ }
+
+ inline void Finish() {
+ try {
+ Flush();
+ } catch (...) {
try {
- DoFinish();
+ DoFinish();
} catch (...) {
- // ¯\_(ツ)_/¯
+ // ¯\_(ツ)_/¯
}
- throw;
+ throw;
}
- DoFinish();
- }
-
-private:
- inline void DoFinish() {
- if (PropagateFinish_) {
- Slave_->Finish();
- }
- }
-
+ DoFinish();
+ }
+
+private:
+ inline void DoFinish() {
+ if (PropagateFinish_) {
+ Slave_->Finish();
+ }
+ }
+
inline size_t Stored() const noexcept {
- return Len() - MemOut_.Avail();
- }
-
+ return Len() - MemOut_.Avail();
+ }
+
inline size_t DownToBufferGranularity(size_t l) const noexcept {
- return l - (l % Len());
- }
-
- virtual void OnBufferExhausted() = 0;
+ return l - (l % Len());
+ }
+
+ virtual void OnBufferExhausted() = 0;
virtual void* Buf() const noexcept = 0;
virtual size_t Len() const noexcept = 0;
-
-private:
+
+private:
IOutputStream* Slave_;
- TMemoryOutput MemOut_;
- bool PropagateFlush_;
- bool PropagateFinish_;
-};
-
-namespace {
- struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> {
+ TMemoryOutput MemOut_;
+ bool PropagateFlush_;
+ bool PropagateFinish_;
+};
+
+namespace {
+ struct TSimpleImpl: public TBufferedOutputBase::TImpl, public TAdditionalStorage<TSimpleImpl> {
inline TSimpleImpl(IOutputStream* slave)
- : TBufferedOutputBase::TImpl(slave)
- {
- Reset();
- }
-
+ : TBufferedOutputBase::TImpl(slave)
+ {
+ Reset();
+ }
+
~TSimpleImpl() override = default;
-
+
void OnBufferExhausted() final {
- }
-
+ }
+
void* Buf() const noexcept override {
- return AdditionalData();
- }
-
+ return AdditionalData();
+ }
+
size_t Len() const noexcept override {
- return AdditionalDataLength();
- }
- };
-
- struct TAdaptiveImpl: public TBufferedOutputBase::TImpl {
- enum {
- Step = 4096
- };
-
+ return AdditionalDataLength();
+ }
+ };
+
+ struct TAdaptiveImpl: public TBufferedOutputBase::TImpl {
+ enum {
+ Step = 4096
+ };
+
inline TAdaptiveImpl(IOutputStream* slave)
- : TBufferedOutputBase::TImpl(slave)
- , N_(0)
- {
- B_.Reserve(Step);
- Reset();
- }
-
+ : TBufferedOutputBase::TImpl(slave)
+ , N_(0)
+ {
+ B_.Reserve(Step);
+ Reset();
+ }
+
~TAdaptiveImpl() override = default;
-
+
void OnBufferExhausted() final {
- const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10);
-
- if (c > B_.Capacity()) {
- TBuffer(c).Swap(B_);
- }
- }
-
+ const size_t c = ((size_t)Step) << Min<size_t>(++N_ / 32, 10);
+
+ if (c > B_.Capacity()) {
+ TBuffer(c).Swap(B_);
+ }
+ }
+
void* Buf() const noexcept override {
- return (void*)B_.Data();
- }
-
+ return (void*)B_.Data();
+ }
+
size_t Len() const noexcept override {
- return B_.Capacity();
- }
-
- TBuffer B_;
- ui64 N_;
- };
-}
-
+ return B_.Capacity();
+ }
+
+ TBuffer B_;
+ ui64 N_;
+ };
+}
+
TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave)
- : Impl_(new TAdaptiveImpl(slave))
-{
-}
-
+ : Impl_(new TAdaptiveImpl(slave))
+{
+}
+
TBufferedOutputBase::TBufferedOutputBase(IOutputStream* slave, size_t buflen)
- : Impl_(new (buflen) TSimpleImpl(slave))
-{
-}
-
+ : Impl_(new (buflen) TSimpleImpl(slave))
+{
+}
+
TBufferedOutputBase::TBufferedOutputBase(TBufferedOutputBase&&) noexcept = default;
TBufferedOutputBase& TBufferedOutputBase::operator=(TBufferedOutputBase&&) noexcept = default;
TBufferedOutputBase::~TBufferedOutputBase() {
- try {
- Finish();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-}
-
+ try {
+ Finish();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
size_t TBufferedOutputBase::DoNext(void** ptr) {
Y_ENSURE(Impl_.Get(), "cannot call next in finished stream");
return Impl_->Next(ptr);
@@ -377,52 +377,52 @@ void TBufferedOutputBase::DoUndo(size_t len) {
Impl_->Undo(len);
}
-void TBufferedOutputBase::DoWrite(const void* data, size_t len) {
+void TBufferedOutputBase::DoWrite(const void* data, size_t len) {
Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
Impl_->Write(data, len);
-}
-
+}
+
void TBufferedOutputBase::DoWriteC(char c) {
Y_ENSURE(Impl_.Get(), "cannot write to finished stream");
Impl_->Write(c);
}
-void TBufferedOutputBase::DoFlush() {
- if (Impl_.Get()) {
- Impl_->Flush();
- }
-}
-
-void TBufferedOutputBase::DoFinish() {
- THolder<TImpl> impl(Impl_.Release());
-
+void TBufferedOutputBase::DoFlush() {
+ if (Impl_.Get()) {
+ Impl_->Flush();
+ }
+}
+
+void TBufferedOutputBase::DoFinish() {
+ THolder<TImpl> impl(Impl_.Release());
+
if (impl) {
- impl->Finish();
- }
-}
-
+ impl->Finish();
+ }
+}
+
void TBufferedOutputBase::SetFlushPropagateMode(bool propagate) noexcept {
- if (Impl_.Get()) {
- Impl_->SetFlushPropagateMode(propagate);
- }
-}
-
+ if (Impl_.Get()) {
+ Impl_->SetFlushPropagateMode(propagate);
+ }
+}
+
void TBufferedOutputBase::SetFinishPropagateMode(bool propagate) noexcept {
- if (Impl_.Get()) {
- Impl_->SetFinishPropagateMode(propagate);
- }
-}
-
+ if (Impl_.Get()) {
+ Impl_->SetFinishPropagateMode(propagate);
+ }
+}
+
TBufferedOutput::TBufferedOutput(IOutputStream* slave, size_t buflen)
- : TBufferedOutputBase(slave, buflen)
-{
-}
-
+ : TBufferedOutputBase(slave, buflen)
+{
+}
+
TBufferedOutput::~TBufferedOutput() = default;
-
+
TAdaptiveBufferedOutput::TAdaptiveBufferedOutput(IOutputStream* slave)
- : TBufferedOutputBase(slave)
-{
-}
-
+ : TBufferedOutputBase(slave)
+{
+}
+
TAdaptiveBufferedOutput::~TAdaptiveBufferedOutput() = default;