diff options
author | iddqd <iddqd@yandex-team.ru> | 2022-02-10 16:49:45 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:45 +0300 |
commit | 07fce9c5f7771600d0b3d70e1f88fd8a7e164d85 (patch) | |
tree | e4aa4750fbb864d70f8c06cf03d2750e979ea3bf /util/stream | |
parent | af42068bf6cd93c976b80dd0388fa48cdf65da11 (diff) | |
download | ydb-07fce9c5f7771600d0b3d70e1f88fd8a7e164d85.tar.gz |
Restoring authorship annotation for <iddqd@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/stream')
-rw-r--r-- | util/stream/buffer.cpp | 10 | ||||
-rw-r--r-- | util/stream/buffer.h | 2 | ||||
-rw-r--r-- | util/stream/buffered.cpp | 16 | ||||
-rw-r--r-- | util/stream/buffered_ut.cpp | 18 | ||||
-rw-r--r-- | util/stream/direct_io.cpp | 78 | ||||
-rw-r--r-- | util/stream/direct_io.h | 60 | ||||
-rw-r--r-- | util/stream/direct_io_ut.cpp | 14 | ||||
-rw-r--r-- | util/stream/ios_ut.cpp | 100 | ||||
-rw-r--r-- | util/stream/mem.cpp | 2 | ||||
-rw-r--r-- | util/stream/mem.h | 2 | ||||
-rw-r--r-- | util/stream/str.cpp | 10 | ||||
-rw-r--r-- | util/stream/str.h | 2 | ||||
-rw-r--r-- | util/stream/walk.cpp | 4 | ||||
-rw-r--r-- | util/stream/walk.h | 2 | ||||
-rw-r--r-- | util/stream/zerocopy.cpp | 42 | ||||
-rw-r--r-- | util/stream/zerocopy.h | 50 |
16 files changed, 206 insertions, 206 deletions
diff --git a/util/stream/buffer.cpp b/util/stream/buffer.cpp index 2facece4ea..bc2159dff8 100644 --- a/util/stream/buffer.cpp +++ b/util/stream/buffer.cpp @@ -113,8 +113,8 @@ size_t TBufferInput::DoNext(const void** ptr, size_t len) { Readed_ += len; return len; } - -void TBufferInput::DoUndo(size_t len) { - Y_VERIFY(len <= Readed_); - Readed_ -= len; -} + +void TBufferInput::DoUndo(size_t len) { + Y_VERIFY(len <= Readed_); + Readed_ -= len; +} diff --git a/util/stream/buffer.h b/util/stream/buffer.h index 9dc99dbe49..c77960bcb6 100644 --- a/util/stream/buffer.h +++ b/util/stream/buffer.h @@ -77,7 +77,7 @@ public: protected: size_t DoNext(const void** ptr, size_t len) override; - void DoUndo(size_t len) override; + void DoUndo(size_t len) override; private: const TBuffer& Buf_; diff --git a/util/stream/buffered.cpp b/util/stream/buffered.cpp index a00e592e1c..de4ec8c756 100644 --- a/util/stream/buffered.cpp +++ b/util/stream/buffered.cpp @@ -35,7 +35,7 @@ public: return MemInput_.Read(buf, len); } - inline size_t Skip(size_t len) { + inline size_t Skip(size_t len) { size_t totalSkipped = 0; while (len) { const size_t skipped = DoSkip(len); @@ -45,11 +45,11 @@ public: totalSkipped += skipped; len -= skipped; - } + } return totalSkipped; - } - + } + inline size_t DoSkip(size_t len) { if (MemInput_.Exhausted()) { if (len > BufLen() / 2) { @@ -131,10 +131,10 @@ size_t TBufferedInput::DoRead(void* buf, size_t len) { return Impl_->Read(buf, len); } -size_t TBufferedInput::DoSkip(size_t len) { - return Impl_->Skip(len); -} - +size_t TBufferedInput::DoSkip(size_t len) { + return Impl_->Skip(len); +} + size_t TBufferedInput::DoNext(const void** ptr, size_t len) { return Impl_->Next(ptr, len); } diff --git a/util/stream/buffered_ut.cpp b/util/stream/buffered_ut.cpp index 41d2fc3030..62a32776de 100644 --- a/util/stream/buffered_ut.cpp +++ b/util/stream/buffered_ut.cpp @@ -59,7 +59,7 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { UNIT_ASSERT_VALUES_EQUAL(s, "112"); } - + Y_UNIT_TEST(Test4) { TString s; @@ -109,24 +109,24 @@ Y_UNIT_TEST_SUITE(TestBufferedIO) { Y_UNIT_TEST(TestInput) { TString s("0123456789abcdefghijklmn"); - TBuffered<TStringInput> in(5, s); - char c; + TBuffered<TStringInput> in(5, s); + char c; UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //1 - UNIT_ASSERT_VALUES_EQUAL(c, '0'); + UNIT_ASSERT_VALUES_EQUAL(c, '0'); UNIT_ASSERT_VALUES_EQUAL(in.Skip(4), 4); //5 end of buffer UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //6 - UNIT_ASSERT_VALUES_EQUAL(c, '5'); + UNIT_ASSERT_VALUES_EQUAL(c, '5'); UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //9 UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //10 end of buffer - UNIT_ASSERT_VALUES_EQUAL(c, '9'); + UNIT_ASSERT_VALUES_EQUAL(c, '9'); UNIT_ASSERT_VALUES_EQUAL(in.Skip(3), 3); //13 UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //14 start new buffer - UNIT_ASSERT_VALUES_EQUAL(c, 'd'); + UNIT_ASSERT_VALUES_EQUAL(c, 'd'); UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 6); //20 UNIT_ASSERT_VALUES_EQUAL(in.Read(&c, 1), 1); //21 start new buffer - UNIT_ASSERT_VALUES_EQUAL(c, 'k'); + UNIT_ASSERT_VALUES_EQUAL(c, 'k'); UNIT_ASSERT_VALUES_EQUAL(in.Skip(6), 3); //24 eof - } + } Y_UNIT_TEST(TestReadTo) { TString s("0123456789abc"); diff --git a/util/stream/direct_io.cpp b/util/stream/direct_io.cpp index 649033af34..5aab0830e1 100644 --- a/util/stream/direct_io.cpp +++ b/util/stream/direct_io.cpp @@ -1,47 +1,47 @@ -#include "direct_io.h" - +#include "direct_io.h" + #include <util/generic/utility.h> -size_t TRandomAccessFileInput::DoRead(void* buf, size_t len) { - const size_t result = File.Pread(buf, len, Position); - Position += result; - return result; -} - -TRandomAccessFileInput::TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position) - : File(file) - , Position(position) -{ -} - -size_t TRandomAccessFileInput::DoSkip(size_t len) { +size_t TRandomAccessFileInput::DoRead(void* buf, size_t len) { + const size_t result = File.Pread(buf, len, Position); + Position += result; + return result; +} + +TRandomAccessFileInput::TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position) + : File(file) + , Position(position) +{ +} + +size_t TRandomAccessFileInput::DoSkip(size_t len) { size_t skiped = Min(len, (size_t)Min((ui64)Max<size_t>(), File.GetLength() - Position)); - Position += skiped; - return skiped; -} - -TRandomAccessFileOutput::TRandomAccessFileOutput(TDirectIOBufferedFile& file) + Position += skiped; + return skiped; +} + +TRandomAccessFileOutput::TRandomAccessFileOutput(TDirectIOBufferedFile& file) : File(&file) -{ -} - -void TRandomAccessFileOutput::DoWrite(const void* buf, size_t len) { +{ +} + +void TRandomAccessFileOutput::DoWrite(const void* buf, size_t len) { File->Write(buf, len); -} - -void TRandomAccessFileOutput::DoFlush() { +} + +void TRandomAccessFileOutput::DoFlush() { File->FlushData(); -} - +} + TBufferedFileOutputEx::TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen) - : TRandomAccessFileOutput(*(new TDirectIOBufferedFile(path, oMode, buflen))) + : TRandomAccessFileOutput(*(new TDirectIOBufferedFile(path, oMode, buflen))) , FileHolder(File) -{ -} - -void TBufferedFileOutputEx::DoFinish() { - FileHolder->Finish(); -} - -void TBufferedFileOutputEx::DoFlush() { -} +{ +} + +void TBufferedFileOutputEx::DoFinish() { + FileHolder->Finish(); +} + +void TBufferedFileOutputEx::DoFlush() { +} diff --git a/util/stream/direct_io.h b/util/stream/direct_io.h index 2e1f2e07dd..0c55f15a95 100644 --- a/util/stream/direct_io.h +++ b/util/stream/direct_io.h @@ -1,43 +1,43 @@ -#pragma once - -#include "input.h" -#include "output.h" -#include <util/system/direct_io.h> - +#pragma once + +#include "input.h" +#include "output.h" +#include <util/system/direct_io.h> + class TRandomAccessFileInput: public IInputStream { -public: - TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position); - -protected: +public: + TRandomAccessFileInput(TDirectIOBufferedFile& file, ui64 position); + +protected: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; - -private: - TDirectIOBufferedFile& File; - ui64 Position; -}; - + +private: + TDirectIOBufferedFile& File; + ui64 Position; +}; + class TRandomAccessFileOutput: public IOutputStream { -public: - TRandomAccessFileOutput(TDirectIOBufferedFile& file); - +public: + TRandomAccessFileOutput(TDirectIOBufferedFile& file); + TRandomAccessFileOutput(TRandomAccessFileOutput&&) noexcept = default; TRandomAccessFileOutput& operator=(TRandomAccessFileOutput&&) noexcept = default; -protected: +protected: TDirectIOBufferedFile* File; - -private: + +private: void DoWrite(const void* buf, size_t len) override; void DoFlush() override; -}; - -class TBufferedFileOutputEx: public TRandomAccessFileOutput { -public: +}; + +class TBufferedFileOutputEx: public TRandomAccessFileOutput { +public: TBufferedFileOutputEx(const TString& path, EOpenMode oMode, size_t buflen = 1 << 17); - -private: + +private: void DoFlush() override; void DoFinish() override; - THolder<TDirectIOBufferedFile> FileHolder; -}; + THolder<TDirectIOBufferedFile> FileHolder; +}; diff --git a/util/stream/direct_io_ut.cpp b/util/stream/direct_io_ut.cpp index 01d09db232..0dbc2454f2 100644 --- a/util/stream/direct_io_ut.cpp +++ b/util/stream/direct_io_ut.cpp @@ -1,12 +1,12 @@ #include <library/cpp/testing/unittest/registar.h> - + #include <util/generic/string.h> #include <util/generic/array_size.h> #include <util/system/env.h> - + #include "buffered.h" #include "direct_io.h" - + Y_UNIT_TEST_SUITE(TDirectIOTests) { // Decrease numBufToWrite further if tests continue to time out static void Y_NO_INLINE Test(EOpenMode mode, size_t numBufToWrite) { @@ -55,8 +55,8 @@ Y_UNIT_TEST_SUITE(TDirectIOTests) { } UNIT_ASSERT_VALUES_EQUAL(bytesRead, numBufToWrite * BUFFER_SIZE); - } - + } + Y_UNIT_TEST(ReadWriteTest) { Test(0, 100 * 32); } @@ -67,5 +67,5 @@ Y_UNIT_TEST_SUITE(TDirectIOTests) { Y_UNIT_TEST(ReadWriteDirectSeqTest) { Test(Direct | Seq, 100 * 4); - } -} + } +} diff --git a/util/stream/ios_ut.cpp b/util/stream/ios_ut.cpp index 139f4296e5..2094295103 100644 --- a/util/stream/ios_ut.cpp +++ b/util/stream/ios_ut.cpp @@ -2,7 +2,7 @@ #include "tokenizer.h" #include "buffer.h" #include "buffered.h" -#include "walk.h" +#include "walk.h" #include <library/cpp/testing/unittest/registar.h> @@ -23,8 +23,8 @@ class TStreamsTest: public TTestBase { UNIT_TEST(TestBufferStream); UNIT_TEST(TestStringStream); UNIT_TEST(TestWtrokaInput); - UNIT_TEST(TestStrokaInput); - UNIT_TEST(TestReadTo); + UNIT_TEST(TestStrokaInput); + UNIT_TEST(TestReadTo); UNIT_TEST(TestWtrokaOutput); UNIT_TEST(TestIStreamOperators); UNIT_TEST(TestWchar16Output); @@ -41,10 +41,10 @@ public: void TestBufferStream(); void TestStringStream(); void TestWtrokaInput(); - void TestStrokaInput(); + void TestStrokaInput(); void TestWtrokaOutput(); void TestIStreamOperators(); - void TestReadTo(); + void TestReadTo(); void TestWchar16Output(); void TestWchar32Output(); void TestUtf16StingOutputByChars(); @@ -280,30 +280,30 @@ void TStreamsTest::TestBufferStream() { namespace { class TStringListInput: public IWalkInput { - public: + public: TStringListInput(const TVector<TString>& data) - : Data_(data) - , Index_(0) - { - } - - protected: - size_t DoUnboundedNext(const void** ptr) override { - if (Index_ >= Data_.size()) { - return 0; - } - + : Data_(data) + , Index_(0) + { + } + + protected: + size_t DoUnboundedNext(const void** ptr) override { + if (Index_ >= Data_.size()) { + return 0; + } + const TString& string = Data_[Index_++]; - - *ptr = string.data(); - return string.size(); - } - - private: + + *ptr = string.data(); + return string.size(); + } + + private: const TVector<TString>& Data_; - size_t Index_; - }; - + size_t Index_; + }; + const char Text[] = // UTF8 encoded "one \ntwo\r\nthree\n\tfour\nfive\n" in russian and ... "один \n" @@ -335,18 +335,18 @@ namespace { TString tmp; input.ReadTo(tmp, 'c'); UNIT_ASSERT_VALUES_EQUAL_C(tmp, "111a222b333", comment); - + char tmp2; input.Read(&tmp2, 1); UNIT_ASSERT_VALUES_EQUAL_C(tmp2, '4', comment); - + input.ReadTo(tmp, '6'); UNIT_ASSERT_VALUES_EQUAL_C(tmp, "44d555e", comment); - + tmp = input.ReadAll(); UNIT_ASSERT_VALUES_EQUAL_C(tmp, "66f", comment); } - + void TestStreamReadTo2(IInputStream& input, const char* comment) { TString s; size_t i = 0; @@ -354,30 +354,30 @@ namespace { UNIT_ASSERT_C(i < Y_ARRAY_SIZE(Expected), comment); UNIT_ASSERT_VALUES_EQUAL_C(s, Expected[i], comment); ++i; - } + } } - + void TestStreamReadTo3(IInputStream& input, const char* comment) { UNIT_ASSERT_VALUES_EQUAL_C(input.ReadLine(), "111a222b333c444d555e666f", comment); } - + void TestStreamReadTo4(IInputStream& input, const char* comment) { UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "one", comment); UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "two", comment); UNIT_ASSERT_VALUES_EQUAL_C(input.ReadTo('\0'), "three", comment); } - + void TestStrokaInput(IInputStream& input, const char* comment) { TString line; ui32 i = 0; TInstant start = Now(); while (input.ReadLine(line)) { ++i; - } + } Cout << comment << ":" << (Now() - start).SecondsFloat() << Endl; UNIT_ASSERT_VALUES_EQUAL(i, 100000); } - + template <class T> void TestStreamReadTo(const TString& text, T test) { TStringInput is(text); @@ -396,25 +396,25 @@ namespace { } } -void TStreamsTest::TestReadTo() { - TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo1); - TestStreamReadTo(Text, TestStreamReadTo2); - TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo3); +void TStreamsTest::TestReadTo() { + TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo1); + TestStreamReadTo(Text, TestStreamReadTo2); + TestStreamReadTo("111a222b333c444d555e666f", TestStreamReadTo3); TString withZero = "one"; - withZero.append('\0').append("two").append('\0').append("three"); - TestStreamReadTo(withZero, TestStreamReadTo4); -} - -void TStreamsTest::TestStrokaInput() { + withZero.append('\0').append("two").append('\0').append("three"); + TestStreamReadTo(withZero, TestStreamReadTo4); +} + +void TStreamsTest::TestStrokaInput() { TString s; - for (ui32 i = 0; i < 100000; ++i) { + for (ui32 i = 0; i < 100000; ++i) { TVector<char> d(i % 1000, 'a'); s.append(d.data(), d.size()); - s.append('\n'); - } + s.append('\n'); + } TestStreamReadTo(s, ::TestStrokaInput); -} - +} + void TStreamsTest::TestWtrokaInput() { const TString s(Text); TStringInput is(s); diff --git a/util/stream/mem.cpp b/util/stream/mem.cpp index 22a3339e27..68778a86f2 100644 --- a/util/stream/mem.cpp +++ b/util/stream/mem.cpp @@ -31,7 +31,7 @@ size_t TMemoryInput::DoNext(const void** ptr, size_t len) { return len; } -void TMemoryInput::DoUndo(size_t len) { +void TMemoryInput::DoUndo(size_t len) { Len_ += len; Buf_ -= len; } diff --git a/util/stream/mem.h b/util/stream/mem.h index 18a5d46772..383e49dfd5 100644 --- a/util/stream/mem.h +++ b/util/stream/mem.h @@ -98,7 +98,7 @@ public: private: size_t DoNext(const void** ptr, size_t len) override; - void DoUndo(size_t len) override; + void DoUndo(size_t len) override; private: const char* Buf_; diff --git a/util/stream/str.cpp b/util/stream/str.cpp index 13f0e8ef28..e1b6e72d67 100644 --- a/util/stream/str.cpp +++ b/util/stream/str.cpp @@ -11,11 +11,11 @@ size_t TStringInput::DoNext(const void** ptr, size_t len) { return len; } -void TStringInput::DoUndo(size_t len) { - Y_VERIFY(len <= Pos_); - Pos_ -= len; -} - +void TStringInput::DoUndo(size_t len) { + Y_VERIFY(len <= Pos_); + Pos_ -= len; +} + TStringOutput::~TStringOutput() = default; size_t TStringOutput::DoNext(void** ptr) { diff --git a/util/stream/str.h b/util/stream/str.h index 028bd572c0..89f42c1fd7 100644 --- a/util/stream/str.h +++ b/util/stream/str.h @@ -49,7 +49,7 @@ public: protected: size_t DoNext(const void** ptr, size_t len) override; - void DoUndo(size_t len) override; + void DoUndo(size_t len) override; private: const TString* S_; diff --git a/util/stream/walk.cpp b/util/stream/walk.cpp index 57dc9ab036..47a6cae5f8 100644 --- a/util/stream/walk.cpp +++ b/util/stream/walk.cpp @@ -3,8 +3,8 @@ #include <util/generic/string.h> void IWalkInput::DoUndo(size_t len) { - Len_ += len; - Buf_ = static_cast<const char*>(Buf_) - len; + Len_ += len; + Buf_ = static_cast<const char*>(Buf_) - len; } size_t IWalkInput::DoNext(const void** ptr, size_t len) { diff --git a/util/stream/walk.h b/util/stream/walk.h index 7e62cb44dc..9683943331 100644 --- a/util/stream/walk.h +++ b/util/stream/walk.h @@ -16,7 +16,7 @@ public: } protected: - void DoUndo(size_t len) override; + void DoUndo(size_t len) override; size_t DoNext(const void** ptr, size_t len) override; /** diff --git a/util/stream/zerocopy.cpp b/util/stream/zerocopy.cpp index dc2982ad55..b5f8e58b70 100644 --- a/util/stream/zerocopy.cpp +++ b/util/stream/zerocopy.cpp @@ -31,30 +31,30 @@ size_t IZeroCopyInput::DoSkip(size_t len) { return DoNext(&ptr, len); } - + IZeroCopyInputFastReadTo::~IZeroCopyInputFastReadTo() = default; - + size_t IZeroCopyInputFastReadTo::DoReadTo(TString& st, char ch) { - const char* ptr; - size_t len = Next(&ptr); - if (!len) { - return 0; - } - size_t result = 0; - st.clear(); - do { - if (const char* pos = (const char*)memchr(ptr, ch, len)) { + const char* ptr; + size_t len = Next(&ptr); + if (!len) { + return 0; + } + size_t result = 0; + st.clear(); + do { + if (const char* pos = (const char*)memchr(ptr, ch, len)) { size_t bytesRead = (pos - ptr) + 1; if (bytesRead > 1) { - st.append(ptr, pos); - } + st.append(ptr, pos); + } Undo(len - bytesRead); result += bytesRead; - return result; - } else { - result += len; - st.append(ptr, len); - } - } while (len = Next(&ptr)); - return result; -} + return result; + } else { + result += len; + st.append(ptr, len); + } + } while (len = Next(&ptr)); + return result; +} diff --git a/util/stream/zerocopy.h b/util/stream/zerocopy.h index 3315aa3a51..d44d5e0d77 100644 --- a/util/stream/zerocopy.h +++ b/util/stream/zerocopy.h @@ -56,36 +56,36 @@ protected: virtual size_t DoNext(const void** ptr, size_t len) = 0; }; -/** -* Input stream with direct access to the input buffer and ability to undo read -* -* Derived classes must implement `DoUndo` method. -*/ +/** +* Input stream with direct access to the input buffer and ability to undo read +* +* Derived classes must implement `DoUndo` method. +*/ class IZeroCopyInputFastReadTo: public IZeroCopyInput { -public: +public: IZeroCopyInputFastReadTo() noexcept = default; ~IZeroCopyInputFastReadTo() override; - + IZeroCopyInputFastReadTo(IZeroCopyInputFastReadTo&&) noexcept = default; IZeroCopyInputFastReadTo& operator=(IZeroCopyInputFastReadTo&&) noexcept = default; -protected: +protected: size_t DoReadTo(TString& st, char ch) override; - -private: - /** - * Undo read. - * - * Note that this function not check if you try undo more that read. In fact Undo used for undo read in last chunk. - * - * @param len[in] Bytes to undo. - */ - inline void Undo(size_t len) { - if (len) { - DoUndo(len); - } - } - virtual void DoUndo(size_t len) = 0; -}; - + +private: + /** + * Undo read. + * + * Note that this function not check if you try undo more that read. In fact Undo used for undo read in last chunk. + * + * @param len[in] Bytes to undo. + */ + inline void Undo(size_t len) { + if (len) { + DoUndo(len); + } + } + virtual void DoUndo(size_t len) = 0; +}; + /** @} */ |