diff options
author | nikitamorozov <nikitamorozov@yandex-team.ru> | 2022-02-10 16:49:20 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:20 +0300 |
commit | 523d26598d9784601932189f7fceb34d61bf7641 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /util/stream | |
parent | 2342f2b56e674c21307fcb92a37853f950224d31 (diff) | |
download | ydb-523d26598d9784601932189f7fceb34d61bf7641.tar.gz |
Restoring authorship annotation for <nikitamorozov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'util/stream')
-rw-r--r-- | util/stream/buffer.cpp | 50 | ||||
-rw-r--r-- | util/stream/buffer.h | 8 | ||||
-rw-r--r-- | util/stream/buffer_ut.cpp | 50 | ||||
-rw-r--r-- | util/stream/buffered.cpp | 58 | ||||
-rw-r--r-- | util/stream/buffered.h | 8 | ||||
-rw-r--r-- | util/stream/buffered_ut.cpp | 66 | ||||
-rw-r--r-- | util/stream/fwd.h | 2 | ||||
-rw-r--r-- | util/stream/mem.cpp | 24 | ||||
-rw-r--r-- | util/stream/mem.h | 8 | ||||
-rw-r--r-- | util/stream/mem_ut.cpp | 72 | ||||
-rw-r--r-- | util/stream/str.cpp | 18 | ||||
-rw-r--r-- | util/stream/str.h | 8 | ||||
-rw-r--r-- | util/stream/str_ut.cpp | 52 | ||||
-rw-r--r-- | util/stream/ut/ya.make | 2 | ||||
-rw-r--r-- | util/stream/zerocopy_output.cpp | 36 | ||||
-rw-r--r-- | util/stream/zerocopy_output.h | 114 | ||||
-rw-r--r-- | util/stream/zerocopy_output_ut.cpp | 94 |
17 files changed, 335 insertions, 335 deletions
diff --git a/util/stream/buffer.cpp b/util/stream/buffer.cpp index b7871a9900..2facece4ea 100644 --- a/util/stream/buffer.cpp +++ b/util/stream/buffer.cpp @@ -1,6 +1,6 @@ #include "buffer.h" #include <util/generic/buffer.h> -#include <util/generic/yexception.h> +#include <util/generic/yexception.h> class TBufferOutput::TImpl { public: @@ -11,21 +11,21 @@ public: virtual ~TImpl() = default; - inline size_t DoNext(void** ptr) { - if (Data_.Avail() == 0) { - Data_.Reserve(FastClp2(Data_.Capacity() + MinBufferGrowSize)); - } - size_t previousSize = Data_.size(); - Data_.Resize(Data_.Capacity()); - *ptr = Data_.Begin() + previousSize; - return Data_.Size() - previousSize; - } - - inline void DoUndo(size_t len) { - Y_VERIFY(len <= Data_.Size(), "trying to undo more bytes than actually written"); - Data_.Resize(Data_.size() - len); - } - + inline size_t DoNext(void** ptr) { + if (Data_.Avail() == 0) { + Data_.Reserve(FastClp2(Data_.Capacity() + MinBufferGrowSize)); + } + size_t previousSize = Data_.size(); + Data_.Resize(Data_.Capacity()); + *ptr = Data_.Begin() + previousSize; + return Data_.Size() - previousSize; + } + + inline void DoUndo(size_t len) { + Y_VERIFY(len <= Data_.Size(), "trying to undo more bytes than actually written"); + Data_.Resize(Data_.size() - len); + } + inline void DoWrite(const void* buf, size_t len) { Data_.Append((const char*)buf, len); } @@ -40,7 +40,7 @@ public: private: TBuffer& Data_; - static constexpr size_t MinBufferGrowSize = 16; + static constexpr size_t MinBufferGrowSize = 16; }; namespace { @@ -75,14 +75,14 @@ TBuffer& TBufferOutput::Buffer() const noexcept { return Impl_->Buffer(); } -size_t TBufferOutput::DoNext(void** ptr) { - return Impl_->DoNext(ptr); -} - -void TBufferOutput::DoUndo(size_t len) { - Impl_->DoUndo(len); -} - +size_t TBufferOutput::DoNext(void** ptr) { + return Impl_->DoNext(ptr); +} + +void TBufferOutput::DoUndo(size_t len) { + Impl_->DoUndo(len); +} + void TBufferOutput::DoWrite(const void* buf, size_t len) { Impl_->DoWrite(buf, len); } diff --git a/util/stream/buffer.h b/util/stream/buffer.h index 5956acd09b..9dc99dbe49 100644 --- a/util/stream/buffer.h +++ b/util/stream/buffer.h @@ -1,7 +1,7 @@ #pragma once #include "zerocopy.h" -#include "zerocopy_output.h" +#include "zerocopy_output.h" #include <util/generic/ptr.h> @@ -15,7 +15,7 @@ class TBuffer; /** * Output stream that writes into a `TBuffer`. */ -class TBufferOutput: public IZeroCopyOutput { +class TBufferOutput: public IZeroCopyOutput { public: class TImpl; @@ -46,8 +46,8 @@ public: TBuffer& Buffer() const noexcept; private: - size_t DoNext(void** ptr) override; - void DoUndo(size_t len) override; + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; diff --git a/util/stream/buffer_ut.cpp b/util/stream/buffer_ut.cpp index 29cd63b5b4..3494696190 100644 --- a/util/stream/buffer_ut.cpp +++ b/util/stream/buffer_ut.cpp @@ -33,31 +33,31 @@ Y_UNIT_TEST_SUITE(TBufferTest) { UNIT_ASSERT_VALUES_EQUAL(tmp, "4567890"); } - Y_UNIT_TEST(WriteViaNextAndUndo) { - TBuffer buffer; - TBufferOutput output(buffer); - TString str; - - for (size_t i = 0; i < 10000; ++i) { - str.push_back('a' + (i % 20)); - } - - size_t written = 0; - void* ptr = nullptr; - while (written < str.size()) { - size_t bufferSize = output.Next(&ptr); - UNIT_ASSERT(ptr && bufferSize > 0); - size_t toWrite = Min(bufferSize, str.size() - written); - memcpy(ptr, str.begin() + written, toWrite); - written += toWrite; - if (toWrite < bufferSize) { - output.Undo(bufferSize - toWrite); - } - } - - UNIT_ASSERT(0 == memcmp(buffer.data(), str.begin(), buffer.size())); - } - + Y_UNIT_TEST(WriteViaNextAndUndo) { + TBuffer buffer; + TBufferOutput output(buffer); + TString str; + + for (size_t i = 0; i < 10000; ++i) { + str.push_back('a' + (i % 20)); + } + + size_t written = 0; + void* ptr = nullptr; + while (written < str.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str.size() - written); + memcpy(ptr, str.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + + UNIT_ASSERT(0 == memcmp(buffer.data(), str.begin(), buffer.size())); + } + Y_UNIT_TEST(Write) { TBuffer buffer; TBufferOutput output(buffer); diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp index 770b183af9..a00e592e1c 100644 --- a/util/stream/buffered.cpp +++ b/util/stream/buffered.cpp @@ -163,21 +163,21 @@ public: MemOut_.Reset(Buf(), Len()); } - inline size_t Next(void** ptr) { - if (MemOut_.Avail() == 0) { - Slave_->Write(Buf(), Stored()); - OnBufferExhausted(); - Reset(); - } - - return MemOut_.Next(ptr); - } - - inline void Undo(size_t len) { - Y_VERIFY(len <= Stored(), "trying to undo more bytes than actually written"); - MemOut_.Undo(len); - } - + inline size_t Next(void** ptr) { + if (MemOut_.Avail() == 0) { + Slave_->Write(Buf(), Stored()); + OnBufferExhausted(); + Reset(); + } + + return MemOut_.Next(ptr); + } + + inline void Undo(size_t len) { + Y_VERIFY(len <= Stored(), "trying to undo more bytes than actually written"); + MemOut_.Undo(len); + } + inline void Write(const void* buf, size_t len) { if (len <= MemOut_.Avail()) { /* @@ -367,24 +367,24 @@ TBufferedOutputBase::~TBufferedOutputBase() { } } -size_t TBufferedOutputBase::DoNext(void** ptr) { - Y_ENSURE(Impl_.Get(), "cannot call next in finished stream"); - return Impl_->Next(ptr); -} - -void TBufferedOutputBase::DoUndo(size_t len) { - Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream"); - Impl_->Undo(len); -} - +size_t TBufferedOutputBase::DoNext(void** ptr) { + Y_ENSURE(Impl_.Get(), "cannot call next in finished stream"); + return Impl_->Next(ptr); +} + +void TBufferedOutputBase::DoUndo(size_t len) { + Y_ENSURE(Impl_.Get(), "cannot call undo in finished stream"); + Impl_->Undo(len); +} + void TBufferedOutputBase::DoWrite(const void* data, size_t len) { - Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); - Impl_->Write(data, len); + Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); + Impl_->Write(data, len); } void TBufferedOutputBase::DoWriteC(char c) { - Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); - Impl_->Write(c); + Y_ENSURE(Impl_.Get(), "cannot write to finished stream"); + Impl_->Write(c); } void TBufferedOutputBase::DoFlush() { diff --git a/util/stream/buffered.h b/util/stream/buffered.h index e3209b3e3e..0847186141 100644 --- a/util/stream/buffered.h +++ b/util/stream/buffered.h @@ -1,7 +1,7 @@ #pragma once #include "zerocopy.h" -#include "zerocopy_output.h" +#include "zerocopy_output.h" #include <utility> #include <util/generic/ptr.h> @@ -59,7 +59,7 @@ private: * Also note that this stream does not claim ownership of the underlying stream, * so it's up to the user to free it. */ -class TBufferedOutputBase: public IZeroCopyOutput { +class TBufferedOutputBase: public IZeroCopyOutput { public: /** * Constructs a buffered stream that dynamically adjusts the size of the @@ -111,8 +111,8 @@ public: class TImpl; protected: - size_t DoNext(void** ptr) override; - void DoUndo(size_t len) override; + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; void DoWrite(const void* data, size_t len) override; void DoWriteC(char c) override; void DoFlush() override; diff --git a/util/stream/buffered_ut.cpp b/util/stream/buffered_ut.cpp index 451f31dbc6..41d2fc3030 100644 --- a/util/stream/buffered_ut.cpp +++ b/util/stream/buffered_ut.cpp @@ -73,40 +73,40 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { UNIT_ASSERT_VALUES_EQUAL(s, "123"); } - template <class TOut> + template <class TOut> inline void DoGenAndWrite(TOut&& output, TString& str) { - TMersenne<ui64> r; - for (size_t i = 0; i < 43210; ++i) { - str.append('A' + (r.GenRand() % 10)); - } - size_t written = 0; - void* ptr = nullptr; - while (written < str.size()) { - size_t bufferSize = output.Next(&ptr); - UNIT_ASSERT(ptr && bufferSize > 0); - size_t toWrite = Min(bufferSize, str.size() - written); - memcpy(ptr, str.begin() + written, toWrite); - written += toWrite; - if (toWrite < bufferSize) { - output.Undo(bufferSize - toWrite); - } - } - } - - Y_UNIT_TEST(TestWriteViaNextAndUndo) { - TString str1, str2; - DoGenAndWrite(TBuffered<TStringOutput>(5000, str1), str2); - - UNIT_ASSERT_STRINGS_EQUAL(str1, str2); - } - - Y_UNIT_TEST(TestWriteViaNextAndUndoAdaptive) { - TString str1, str2; - DoGenAndWrite(TAdaptivelyBuffered<TStringOutput>(str1), str2); - - UNIT_ASSERT_STRINGS_EQUAL(str1, str2); - } - + TMersenne<ui64> r; + for (size_t i = 0; i < 43210; ++i) { + str.append('A' + (r.GenRand() % 10)); + } + size_t written = 0; + void* ptr = nullptr; + while (written < str.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str.size() - written); + memcpy(ptr, str.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + } + + Y_UNIT_TEST(TestWriteViaNextAndUndo) { + TString str1, str2; + DoGenAndWrite(TBuffered<TStringOutput>(5000, str1), str2); + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + + Y_UNIT_TEST(TestWriteViaNextAndUndoAdaptive) { + TString str1, str2; + DoGenAndWrite(TAdaptivelyBuffered<TStringOutput>(str1), str2); + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + Y_UNIT_TEST(TestInput) { TString s("0123456789abcdefghijklmn"); TBuffered<TStringInput> in(5, s); diff --git a/util/stream/fwd.h b/util/stream/fwd.h index 7ac9de6719..307676c6a7 100644 --- a/util/stream/fwd.h +++ b/util/stream/fwd.h @@ -7,7 +7,7 @@ class IOutputStream; class IZeroCopyInput; class IZeroCopyInputFastReadTo; -class IZeroCopyOutput; +class IZeroCopyOutput; using TStreamManipulator = void (*)(IOutputStream&); diff --git a/util/stream/mem.cpp b/util/stream/mem.cpp index d57a3145fe..22a3339e27 100644 --- a/util/stream/mem.cpp +++ b/util/stream/mem.cpp @@ -38,19 +38,19 @@ void TMemoryInput::DoUndo(size_t len) { TMemoryOutput::~TMemoryOutput() = default; -size_t TMemoryOutput::DoNext(void** ptr) { +size_t TMemoryOutput::DoNext(void** ptr) { Y_ENSURE(Buf_ < End_, TStringBuf("memory output stream exhausted")); - *ptr = Buf_; - size_t bufferSize = End_ - Buf_; - Buf_ = End_; - - return bufferSize; -} - -void TMemoryOutput::DoUndo(size_t len) { - Buf_ -= len; -} - + *ptr = Buf_; + size_t bufferSize = End_ - Buf_; + Buf_ = End_; + + return bufferSize; +} + +void TMemoryOutput::DoUndo(size_t len) { + Buf_ -= len; +} + void TMemoryOutput::DoWrite(const void* buf, size_t len) { char* end = Buf_ + len; Y_ENSURE(end <= End_, TStringBuf("memory output stream exhausted")); diff --git a/util/stream/mem.h b/util/stream/mem.h index bb14fa93a8..18a5d46772 100644 --- a/util/stream/mem.h +++ b/util/stream/mem.h @@ -1,7 +1,7 @@ #pragma once #include "zerocopy.h" -#include "zerocopy_output.h" +#include "zerocopy_output.h" #include <util/generic/strbuf.h> @@ -108,7 +108,7 @@ private: /** * Output stream that writes data to a memory block. */ -class TMemoryOutput: public IZeroCopyOutput { +class TMemoryOutput: public IZeroCopyOutput { public: /** * Constructs a stream that writes to the provided memory block. It's up @@ -174,8 +174,8 @@ public: } private: - size_t DoNext(void** ptr) override; - void DoUndo(size_t len) override; + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; diff --git a/util/stream/mem_ut.cpp b/util/stream/mem_ut.cpp index e2b3f398b3..f388ae66ac 100644 --- a/util/stream/mem_ut.cpp +++ b/util/stream/mem_ut.cpp @@ -14,42 +14,42 @@ Y_UNIT_TEST_SUITE(TestMemIO) { UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); } - Y_UNIT_TEST(NextAndUndo) { - char buffer[20]; - TMemoryOutput output(buffer, sizeof(buffer)); - char* ptr = nullptr; - size_t bufferSize = output.Next(&ptr); - UNIT_ASSERT_GE(bufferSize, 1); - *ptr = '1'; - if (bufferSize > 1) { - output.Undo(bufferSize - 1); - } - - bufferSize = output.Next(&ptr); - UNIT_ASSERT_GE(bufferSize, 2); - *ptr = '2'; - *(ptr + 1) = '2'; - if (bufferSize > 2) { - output.Undo(bufferSize - 2); - } - - bufferSize = output.Next(&ptr); - UNIT_ASSERT_GE(bufferSize, 3); - *ptr = '3'; - *(ptr + 1) = '3'; - *(ptr + 2) = '3'; - if (bufferSize > 3) { - output.Undo(bufferSize - 3); - } - - output.Finish(); - - const char* const result = "1" - "22" - "333"; - UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); - } - + Y_UNIT_TEST(NextAndUndo) { + char buffer[20]; + TMemoryOutput output(buffer, sizeof(buffer)); + char* ptr = nullptr; + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 1); + *ptr = '1'; + if (bufferSize > 1) { + output.Undo(bufferSize - 1); + } + + bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 2); + *ptr = '2'; + *(ptr + 1) = '2'; + if (bufferSize > 2) { + output.Undo(bufferSize - 2); + } + + bufferSize = output.Next(&ptr); + UNIT_ASSERT_GE(bufferSize, 3); + *ptr = '3'; + *(ptr + 1) = '3'; + *(ptr + 2) = '3'; + if (bufferSize > 3) { + output.Undo(bufferSize - 3); + } + + output.Finish(); + + const char* const result = "1" + "22" + "333"; + UNIT_ASSERT(0 == memcmp(buffer, result, strlen(result))); + } + Y_UNIT_TEST(Write) { char buffer[20]; TMemoryOutput output(buffer, sizeof(buffer)); diff --git a/util/stream/str.cpp b/util/stream/str.cpp index 622f2fa160..13f0e8ef28 100644 --- a/util/stream/str.cpp +++ b/util/stream/str.cpp @@ -1,7 +1,7 @@ #include "str.h" -static constexpr size_t MIN_BUFFER_GROW_SIZE = 16; - +static constexpr size_t MIN_BUFFER_GROW_SIZE = 16; + TStringInput::~TStringInput() = default; size_t TStringInput::DoNext(const void** ptr, size_t len) { @@ -18,21 +18,21 @@ void TStringInput::DoUndo(size_t len) { TStringOutput::~TStringOutput() = default; -size_t TStringOutput::DoNext(void** ptr) { +size_t TStringOutput::DoNext(void** ptr) { if (S_->size() == S_->capacity()) { S_->reserve(FastClp2(S_->capacity() + MIN_BUFFER_GROW_SIZE)); - } + } size_t previousSize = S_->size(); ResizeUninitialized(*S_, S_->capacity()); *ptr = S_->begin() + previousSize; return S_->size() - previousSize; -} - -void TStringOutput::DoUndo(size_t len) { +} + +void TStringOutput::DoUndo(size_t len) { Y_VERIFY(len <= S_->size(), "trying to undo more bytes than actually written"); S_->resize(S_->size() - len); -} - +} + void TStringOutput::DoWrite(const void* buf, size_t len) { S_->append((const char*)buf, len); } diff --git a/util/stream/str.h b/util/stream/str.h index 7e1ce848cc..028bd572c0 100644 --- a/util/stream/str.h +++ b/util/stream/str.h @@ -1,7 +1,7 @@ #pragma once #include "zerocopy.h" -#include "zerocopy_output.h" +#include "zerocopy_output.h" #include <util/generic/string.h> #include <util/generic/noncopyable.h> @@ -61,7 +61,7 @@ private: /** * Stream for writing data into a string. */ -class TStringOutput: public IZeroCopyOutput { +class TStringOutput: public IZeroCopyOutput { public: /** * Constructs a string output stream that appends character data to the @@ -95,8 +95,8 @@ public: } protected: - size_t DoNext(void** ptr) override; - void DoUndo(size_t len) override; + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; void DoWrite(const void* buf, size_t len) override; void DoWriteC(char c) override; diff --git a/util/stream/str_ut.cpp b/util/stream/str_ut.cpp index 264db21dcf..fc6b46c31a 100644 --- a/util/stream/str_ut.cpp +++ b/util/stream/str_ut.cpp @@ -6,7 +6,7 @@ template <typename T> const T ReturnConstTemp(); -Y_UNIT_TEST_SUITE(TStringInputOutputTest) { +Y_UNIT_TEST_SUITE(TStringInputOutputTest) { Y_UNIT_TEST(Lvalue) { TString str = "Hello, World!"; TStringInput input(str); @@ -88,31 +88,31 @@ Y_UNIT_TEST_SUITE(TStringInputOutputTest) { UNIT_ASSERT_VALUES_EQUAL(t, "89abc"); } - Y_UNIT_TEST(WriteViaNextAndUndo) { - TString str1; - TStringOutput output(str1); - TString str2; - - for (size_t i = 0; i < 10000; ++i) { - str2.push_back('a' + (i % 20)); - } - - size_t written = 0; - void* ptr = nullptr; - while (written < str2.size()) { - size_t bufferSize = output.Next(&ptr); - UNIT_ASSERT(ptr && bufferSize > 0); - size_t toWrite = Min(bufferSize, str2.size() - written); - memcpy(ptr, str2.begin() + written, toWrite); - written += toWrite; - if (toWrite < bufferSize) { - output.Undo(bufferSize - toWrite); - } - } - - UNIT_ASSERT_STRINGS_EQUAL(str1, str2); - } - + Y_UNIT_TEST(WriteViaNextAndUndo) { + TString str1; + TStringOutput output(str1); + TString str2; + + for (size_t i = 0; i < 10000; ++i) { + str2.push_back('a' + (i % 20)); + } + + size_t written = 0; + void* ptr = nullptr; + while (written < str2.size()) { + size_t bufferSize = output.Next(&ptr); + UNIT_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, str2.size() - written); + memcpy(ptr, str2.begin() + written, toWrite); + written += toWrite; + if (toWrite < bufferSize) { + output.Undo(bufferSize - toWrite); + } + } + + UNIT_ASSERT_STRINGS_EQUAL(str1, str2); + } + Y_UNIT_TEST(Write) { TString str; TStringOutput output(str); diff --git a/util/stream/ut/ya.make b/util/stream/ut/ya.make index 9699630190..f0176dd7b4 100644 --- a/util/stream/ut/ya.make +++ b/util/stream/ut/ya.make @@ -21,7 +21,7 @@ SRCS( stream/str_ut.cpp stream/tokenizer_ut.cpp stream/walk_ut.cpp - stream/zerocopy_output_ut.cpp + stream/zerocopy_output_ut.cpp stream/zlib_ut.cpp ) diff --git a/util/stream/zerocopy_output.cpp b/util/stream/zerocopy_output.cpp index 97777724d1..23600ef6e1 100644 --- a/util/stream/zerocopy_output.cpp +++ b/util/stream/zerocopy_output.cpp @@ -1,18 +1,18 @@ -#include "zerocopy_output.h" - -#include <util/generic/utility.h> - -void IZeroCopyOutput::DoWrite(const void* buf, size_t len) { - void* ptr = nullptr; - size_t writtenBytes = 0; - while (writtenBytes < len) { - size_t bufferSize = DoNext(&ptr); - Y_ASSERT(ptr && bufferSize > 0); - size_t toWrite = Min(bufferSize, len - writtenBytes); - memcpy(ptr, static_cast<const char*>(buf) + writtenBytes, toWrite); - writtenBytes += toWrite; - if (toWrite < bufferSize) { - DoUndo(bufferSize - toWrite); - } - } -} +#include "zerocopy_output.h" + +#include <util/generic/utility.h> + +void IZeroCopyOutput::DoWrite(const void* buf, size_t len) { + void* ptr = nullptr; + size_t writtenBytes = 0; + while (writtenBytes < len) { + size_t bufferSize = DoNext(&ptr); + Y_ASSERT(ptr && bufferSize > 0); + size_t toWrite = Min(bufferSize, len - writtenBytes); + memcpy(ptr, static_cast<const char*>(buf) + writtenBytes, toWrite); + writtenBytes += toWrite; + if (toWrite < bufferSize) { + DoUndo(bufferSize - toWrite); + } + } +} diff --git a/util/stream/zerocopy_output.h b/util/stream/zerocopy_output.h index 1986e76143..a388be04b0 100644 --- a/util/stream/zerocopy_output.h +++ b/util/stream/zerocopy_output.h @@ -1,57 +1,57 @@ -#pragma once - -#include <util/system/yassert.h> - -#include "output.h" - -/** - * @addtogroup Streams - * @{ - */ - -/** - * Output stream with direct access to the output buffer. - * - * Derived classes must implement `DoNext` and `DoUndo` methods. - */ -class IZeroCopyOutput: public IOutputStream { -public: - IZeroCopyOutput() noexcept = default; - ~IZeroCopyOutput() override = default; - - IZeroCopyOutput(IZeroCopyOutput&&) noexcept = default; - IZeroCopyOutput& operator=(IZeroCopyOutput&&) noexcept = default; - - /** - * Returns the next buffer to write to from this output stream. - * - * @param ptr[out] Pointer to the start of the buffer. - * @returns Size of the returned buffer, in bytes. - * Return value is always nonzero. - */ - template <class T> - inline size_t Next(T** ptr) { - Y_ASSERT(ptr); - - return DoNext((void**)ptr); - } - - /** - * Tells the stream that `len` bytes at the end of the buffer returned previously - * by Next were actually not written so the current position in stream must be moved backwards. - * `len` must not be greater than the size of the buffer previously returned by `Next`. - * - * @param len[in] Number of bytes at the end to move the position by. - * - */ - inline void Undo(size_t len) { - return DoUndo(len); - } - -protected: - void DoWrite(const void* buf, size_t len) override; - virtual size_t DoNext(void** ptr) = 0; - virtual void DoUndo(size_t len) = 0; -}; - -/** @} */ +#pragma once + +#include <util/system/yassert.h> + +#include "output.h" + +/** + * @addtogroup Streams + * @{ + */ + +/** + * Output stream with direct access to the output buffer. + * + * Derived classes must implement `DoNext` and `DoUndo` methods. + */ +class IZeroCopyOutput: public IOutputStream { +public: + IZeroCopyOutput() noexcept = default; + ~IZeroCopyOutput() override = default; + + IZeroCopyOutput(IZeroCopyOutput&&) noexcept = default; + IZeroCopyOutput& operator=(IZeroCopyOutput&&) noexcept = default; + + /** + * Returns the next buffer to write to from this output stream. + * + * @param ptr[out] Pointer to the start of the buffer. + * @returns Size of the returned buffer, in bytes. + * Return value is always nonzero. + */ + template <class T> + inline size_t Next(T** ptr) { + Y_ASSERT(ptr); + + return DoNext((void**)ptr); + } + + /** + * Tells the stream that `len` bytes at the end of the buffer returned previously + * by Next were actually not written so the current position in stream must be moved backwards. + * `len` must not be greater than the size of the buffer previously returned by `Next`. + * + * @param len[in] Number of bytes at the end to move the position by. + * + */ + inline void Undo(size_t len) { + return DoUndo(len); + } + +protected: + void DoWrite(const void* buf, size_t len) override; + virtual size_t DoNext(void** ptr) = 0; + virtual void DoUndo(size_t len) = 0; +}; + +/** @} */ diff --git a/util/stream/zerocopy_output_ut.cpp b/util/stream/zerocopy_output_ut.cpp index 8d58a2e848..e81f7fb056 100644 --- a/util/stream/zerocopy_output_ut.cpp +++ b/util/stream/zerocopy_output_ut.cpp @@ -1,48 +1,48 @@ -#include "zerocopy_output.h" - +#include "zerocopy_output.h" + #include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/string.h> - -// This version of string output stream is written here only -// for testing IZeroCopyOutput implementation of DoWrite. -class TSimpleStringOutput: public IZeroCopyOutput { -public: - TSimpleStringOutput(TString& s) noexcept - : S_(s) - { - } - -private: - size_t DoNext(void** ptr) override { - if (S_.size() == S_.capacity()) { - S_.reserve(FastClp2(S_.capacity() + 1)); - } - size_t previousSize = S_.size(); - S_.resize(S_.capacity()); - *ptr = S_.begin() + previousSize; - return S_.size() - previousSize; - } - - void DoUndo(size_t len) override { - Y_ENSURE(len <= S_.size()); - S_.resize(S_.size() - len); - } - - TString& S_; -}; - -Y_UNIT_TEST_SUITE(TestZerocopyOutput) { - Y_UNIT_TEST(Write) { - TString str; - TSimpleStringOutput output(str); - TString result; - - for (size_t i = 0; i < 1000; ++i) { - result.push_back('a' + (i % 20)); - } - - output.Write(result.begin(), result.size()); - UNIT_ASSERT_STRINGS_EQUAL(str, result); - } -} + +#include <util/generic/string.h> + +// This version of string output stream is written here only +// for testing IZeroCopyOutput implementation of DoWrite. +class TSimpleStringOutput: public IZeroCopyOutput { +public: + TSimpleStringOutput(TString& s) noexcept + : S_(s) + { + } + +private: + size_t DoNext(void** ptr) override { + if (S_.size() == S_.capacity()) { + S_.reserve(FastClp2(S_.capacity() + 1)); + } + size_t previousSize = S_.size(); + S_.resize(S_.capacity()); + *ptr = S_.begin() + previousSize; + return S_.size() - previousSize; + } + + void DoUndo(size_t len) override { + Y_ENSURE(len <= S_.size()); + S_.resize(S_.size() - len); + } + + TString& S_; +}; + +Y_UNIT_TEST_SUITE(TestZerocopyOutput) { + Y_UNIT_TEST(Write) { + TString str; + TSimpleStringOutput output(str); + TString result; + + for (size_t i = 0; i < 1000; ++i) { + result.push_back('a' + (i % 20)); + } + + output.Write(result.begin(), result.size()); + UNIT_ASSERT_STRINGS_EQUAL(str, result); + } +} |