diff options
| author | babenko <[email protected]> | 2022-11-13 12:15:50 +0300 | 
|---|---|---|
| committer | babenko <[email protected]> | 2022-11-13 12:15:50 +0300 | 
| commit | f6425222980211ebf68592a1658e87f0fb442972 (patch) | |
| tree | 779d1f4392c258027651ee0b7b97f93464747fd7 /library/cpp | |
| parent | 277cd4ec47419923e9cfabd3ea4726437cbfa53c (diff) | |
Move TChunkedOutputStream to library and make it zero-copy aware
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/yt/memory/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | library/cpp/yt/memory/blob.h | 46 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.cpp | 118 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.h | 67 | 
4 files changed, 189 insertions, 43 deletions
diff --git a/library/cpp/yt/memory/CMakeLists.txt b/library/cpp/yt/memory/CMakeLists.txt index ca44ef58cb6..5bcfddec029 100644 --- a/library/cpp/yt/memory/CMakeLists.txt +++ b/library/cpp/yt/memory/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(cpp-yt-memory PUBLIC  )  target_sources(cpp-yt-memory PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/shared_range.cpp diff --git a/library/cpp/yt/memory/blob.h b/library/cpp/yt/memory/blob.h index 99441fb8c97..0a1318c2310 100644 --- a/library/cpp/yt/memory/blob.h +++ b/library/cpp/yt/memory/blob.h @@ -21,9 +21,9 @@ class TBlob  {  public:      //! Constructs a blob with a given size. -    TBlob( -        TRefCountedTypeCookie tagCookie, -        size_t size, +    explicit TBlob( +        TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultBlobTag>(), +        size_t size = 0,          bool initiailizeStorage = true,          bool pageAligned = false); @@ -33,46 +33,6 @@ public:          TRef data,          bool pageAligned = false); -    //! Constructs an empty blob. -    template <class TTag = TDefaultBlobTag> -    explicit TBlob(TTag tag = {}) -        : TBlob(tag, 0, true, false) -    { } - -    //! Constructs a blob with a given size. -    template <class TTag> -    explicit TBlob( -        TTag, -        size_t size, -        bool initiailizeStorage = true, -        bool pageAligned = false) -        : TBlob( -            GetRefCountedTypeCookie<TTag>(), -            size, -            initiailizeStorage, -            pageAligned) -    { } - -    //! Copies a chunk of memory into a new instance. -    template <class TTag> -    TBlob( -        TTag, -        TRef data, -        bool pageAligned = false) -        : TBlob( -            GetRefCountedTypeCookie<TTag>(), -            data, -            pageAligned) -    { } - -    //! Remind user about the tag argument. -    TBlob(i32 size, bool initiailizeStorage = true) = delete; -    TBlob(i64 size, bool initiailizeStorage = true) = delete; -    TBlob(ui32 size, bool initiailizeStorage = true) = delete; -    TBlob(ui64 size, bool initiailizeStorage = true) = delete; -    template <typename T, typename U> -    TBlob(const T*, U) = delete; -      //! Copies the data.      TBlob(const TBlob& other); diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp new file mode 100644 index 00000000000..90ec36fe23f --- /dev/null +++ b/library/cpp/yt/memory/chunked_output_stream.cpp @@ -0,0 +1,118 @@ +#include "chunked_output_stream.h" + +#include <util/system/sanitizers.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedOutputStream::TChunkedOutputStream( +    TRefCountedTypeCookie tagCookie, +    size_t initialReserveSize, +    size_t maxReserveSize) +    : MaxReserveSize_(RoundUpToPage(maxReserveSize)) +    , CurrentReserveSize_(RoundUpToPage(initialReserveSize)) +    , CurrentChunk_(tagCookie, /*size*/ 0) +{ +    YT_VERIFY(MaxReserveSize_ > 0); + +    if (CurrentReserveSize_ > MaxReserveSize_) { +        CurrentReserveSize_ = MaxReserveSize_; +    } +} + +std::vector<TSharedRef> TChunkedOutputStream::Finish() +{ +    FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + +    YT_ASSERT(CurrentChunk_.IsEmpty()); +    FinishedSize_ = 0; + +    for (const auto& chunk : FinishedChunks_) { +        NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size()); +    } + +    return std::move(FinishedChunks_); +} + +size_t TChunkedOutputStream::GetSize() const +{ +    return FinishedSize_ + CurrentChunk_.Size(); +} + +size_t TChunkedOutputStream::GetCapacity() const +{ +    return FinishedSize_ + CurrentChunk_.Capacity(); +} + +void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired) +{ +    YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity()); +    FinishedSize_ += CurrentChunk_.Size(); +    FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); +    CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); +    CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_)); +} + +void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) +{ +    if (CurrentChunk_.Capacity() == 0) { +        CurrentChunk_.Reserve(CurrentReserveSize_); +    } + +    auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size()); +    CurrentChunk_.Append(buffer, spaceAvailable); + +    auto spaceRequired = length - spaceAvailable; +    if (spaceRequired > 0) { +        ReserveNewChunk(spaceRequired); +        CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired); +    } +} + +size_t TChunkedOutputStream::DoNext(void** ptr) +{ +    if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) { +        if (CurrentChunk_.Capacity() == 0) { +            CurrentChunk_.Reserve(CurrentReserveSize_); +        } else { +            ReserveNewChunk(0); +        } +    } + +    auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size(); +    YT_ASSERT(spaceAvailable > 0); +    *ptr = CurrentChunk_.End(); +    CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false); +    return spaceAvailable; +} + +void TChunkedOutputStream::DoUndo(size_t len) +{ +    YT_VERIFY(CurrentChunk_.Size() >= len); +    CurrentChunk_.Resize(CurrentChunk_.Size() - len); +} + +char* TChunkedOutputStream::Preallocate(size_t size) +{ +    size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size(); +    if (available < size) { +        FinishedSize_ += CurrentChunk_.Size(); +        FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + +        CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); + +        CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_)); +    } +    return CurrentChunk_.End(); +} + +void TChunkedOutputStream::Advance(size_t size) +{ +    YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity()); +    CurrentChunk_.Resize(CurrentChunk_.Size() + size, false); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_output_stream.h b/library/cpp/yt/memory/chunked_output_stream.h new file mode 100644 index 00000000000..57bf6c318df --- /dev/null +++ b/library/cpp/yt/memory/chunked_output_stream.h @@ -0,0 +1,67 @@ +#pragma once + +#include "blob.h" + +#include <library/cpp/yt/memory/ref.h> + +#include <util/stream/zerocopy_output.h> + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedOutputStreamTag +{ }; + +class TChunkedOutputStream +    : public IZeroCopyOutput +{ +public: +    explicit TChunkedOutputStream( +        TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(), +        size_t initialReserveSize = 4_KB, +        size_t maxReserveSize = 64_KB); + +    TChunkedOutputStream(TChunkedOutputStream&&) = default; +    TChunkedOutputStream& operator=(TChunkedOutputStream&&) = default; + +    //! Returns a sequence of written chunks. +    //! The stream is no longer usable after this call. +    std::vector<TSharedRef> Finish(); + +    //! Returns the number of bytes actually written. +    size_t GetSize() const; + +    //! Returns the number of bytes actually written plus unused capacity in the +    //! last chunk. +    size_t GetCapacity() const; + +    //! Returns a pointer to a contiguous memory block of a given #size. +    //! Do not forget to call #Advance after use. +    char* Preallocate(size_t size); + +    //! Marks #size bytes (which were previously preallocated) as used. +    void Advance(size_t size); + +private: +    size_t MaxReserveSize_; +    size_t CurrentReserveSize_; + +    size_t FinishedSize_ = 0; + +    TBlob CurrentChunk_; +    std::vector<TSharedRef> FinishedChunks_; + + +    void ReserveNewChunk(size_t spaceRequired); + +    void DoWrite(const void* buf, size_t len) override; +    size_t DoNext(void** ptr) override; +    void DoUndo(size_t len) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT  | 
