diff options
author | babenko <babenko@yandex-team.com> | 2022-11-13 12:15:50 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2022-11-13 12:15:50 +0300 |
commit | f6425222980211ebf68592a1658e87f0fb442972 (patch) | |
tree | 779d1f4392c258027651ee0b7b97f93464747fd7 /library/cpp | |
parent | 277cd4ec47419923e9cfabd3ea4726437cbfa53c (diff) | |
download | ydb-f6425222980211ebf68592a1658e87f0fb442972.tar.gz |
Move TChunkedOutputStream to library and make it zero-copy aware
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/yt/memory/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/yt/memory/blob.h | 46 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.cpp | 118 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.h | 67 |
4 files changed, 189 insertions, 43 deletions
diff --git a/library/cpp/yt/memory/CMakeLists.txt b/library/cpp/yt/memory/CMakeLists.txt index ca44ef58cb6..5bcfddec029 100644 --- a/library/cpp/yt/memory/CMakeLists.txt +++ b/library/cpp/yt/memory/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(cpp-yt-memory PUBLIC ) target_sources(cpp-yt-memory PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/shared_range.cpp diff --git a/library/cpp/yt/memory/blob.h b/library/cpp/yt/memory/blob.h index 99441fb8c97..0a1318c2310 100644 --- a/library/cpp/yt/memory/blob.h +++ b/library/cpp/yt/memory/blob.h @@ -21,9 +21,9 @@ class TBlob { public: //! Constructs a blob with a given size. - TBlob( - TRefCountedTypeCookie tagCookie, - size_t size, + explicit TBlob( + TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultBlobTag>(), + size_t size = 0, bool initiailizeStorage = true, bool pageAligned = false); @@ -33,46 +33,6 @@ public: TRef data, bool pageAligned = false); - //! Constructs an empty blob. - template <class TTag = TDefaultBlobTag> - explicit TBlob(TTag tag = {}) - : TBlob(tag, 0, true, false) - { } - - //! Constructs a blob with a given size. - template <class TTag> - explicit TBlob( - TTag, - size_t size, - bool initiailizeStorage = true, - bool pageAligned = false) - : TBlob( - GetRefCountedTypeCookie<TTag>(), - size, - initiailizeStorage, - pageAligned) - { } - - //! Copies a chunk of memory into a new instance. - template <class TTag> - TBlob( - TTag, - TRef data, - bool pageAligned = false) - : TBlob( - GetRefCountedTypeCookie<TTag>(), - data, - pageAligned) - { } - - //! Remind user about the tag argument. - TBlob(i32 size, bool initiailizeStorage = true) = delete; - TBlob(i64 size, bool initiailizeStorage = true) = delete; - TBlob(ui32 size, bool initiailizeStorage = true) = delete; - TBlob(ui64 size, bool initiailizeStorage = true) = delete; - template <typename T, typename U> - TBlob(const T*, U) = delete; - //! Copies the data. TBlob(const TBlob& other); diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp new file mode 100644 index 00000000000..90ec36fe23f --- /dev/null +++ b/library/cpp/yt/memory/chunked_output_stream.cpp @@ -0,0 +1,118 @@ +#include "chunked_output_stream.h" + +#include <util/system/sanitizers.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedOutputStream::TChunkedOutputStream( + TRefCountedTypeCookie tagCookie, + size_t initialReserveSize, + size_t maxReserveSize) + : MaxReserveSize_(RoundUpToPage(maxReserveSize)) + , CurrentReserveSize_(RoundUpToPage(initialReserveSize)) + , CurrentChunk_(tagCookie, /*size*/ 0) +{ + YT_VERIFY(MaxReserveSize_ > 0); + + if (CurrentReserveSize_ > MaxReserveSize_) { + CurrentReserveSize_ = MaxReserveSize_; + } +} + +std::vector<TSharedRef> TChunkedOutputStream::Finish() +{ + FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + + YT_ASSERT(CurrentChunk_.IsEmpty()); + FinishedSize_ = 0; + + for (const auto& chunk : FinishedChunks_) { + NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size()); + } + + return std::move(FinishedChunks_); +} + +size_t TChunkedOutputStream::GetSize() const +{ + return FinishedSize_ + CurrentChunk_.Size(); +} + +size_t TChunkedOutputStream::GetCapacity() const +{ + return FinishedSize_ + CurrentChunk_.Capacity(); +} + +void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired) +{ + YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity()); + FinishedSize_ += CurrentChunk_.Size(); + FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); + CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_)); +} + +void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) +{ + if (CurrentChunk_.Capacity() == 0) { + CurrentChunk_.Reserve(CurrentReserveSize_); + } + + auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size()); + CurrentChunk_.Append(buffer, spaceAvailable); + + auto spaceRequired = length - spaceAvailable; + if (spaceRequired > 0) { + ReserveNewChunk(spaceRequired); + CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired); + } +} + +size_t TChunkedOutputStream::DoNext(void** ptr) +{ + if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) { + if (CurrentChunk_.Capacity() == 0) { + CurrentChunk_.Reserve(CurrentReserveSize_); + } else { + ReserveNewChunk(0); + } + } + + auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size(); + YT_ASSERT(spaceAvailable > 0); + *ptr = CurrentChunk_.End(); + CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false); + return spaceAvailable; +} + +void TChunkedOutputStream::DoUndo(size_t len) +{ + YT_VERIFY(CurrentChunk_.Size() >= len); + CurrentChunk_.Resize(CurrentChunk_.Size() - len); +} + +char* TChunkedOutputStream::Preallocate(size_t size) +{ + size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size(); + if (available < size) { + FinishedSize_ += CurrentChunk_.Size(); + FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + + CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); + + CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_)); + } + return CurrentChunk_.End(); +} + +void TChunkedOutputStream::Advance(size_t size) +{ + YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity()); + CurrentChunk_.Resize(CurrentChunk_.Size() + size, false); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_output_stream.h b/library/cpp/yt/memory/chunked_output_stream.h new file mode 100644 index 00000000000..57bf6c318df --- /dev/null +++ b/library/cpp/yt/memory/chunked_output_stream.h @@ -0,0 +1,67 @@ +#pragma once + +#include "blob.h" + +#include <library/cpp/yt/memory/ref.h> + +#include <util/stream/zerocopy_output.h> + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedOutputStreamTag +{ }; + +class TChunkedOutputStream + : public IZeroCopyOutput +{ +public: + explicit TChunkedOutputStream( + TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(), + size_t initialReserveSize = 4_KB, + size_t maxReserveSize = 64_KB); + + TChunkedOutputStream(TChunkedOutputStream&&) = default; + TChunkedOutputStream& operator=(TChunkedOutputStream&&) = default; + + //! Returns a sequence of written chunks. + //! The stream is no longer usable after this call. + std::vector<TSharedRef> Finish(); + + //! Returns the number of bytes actually written. + size_t GetSize() const; + + //! Returns the number of bytes actually written plus unused capacity in the + //! last chunk. + size_t GetCapacity() const; + + //! Returns a pointer to a contiguous memory block of a given #size. + //! Do not forget to call #Advance after use. + char* Preallocate(size_t size); + + //! Marks #size bytes (which were previously preallocated) as used. + void Advance(size_t size); + +private: + size_t MaxReserveSize_; + size_t CurrentReserveSize_; + + size_t FinishedSize_ = 0; + + TBlob CurrentChunk_; + std::vector<TSharedRef> FinishedChunks_; + + + void ReserveNewChunk(size_t spaceRequired); + + void DoWrite(const void* buf, size_t len) override; + size_t DoNext(void** ptr) override; + void DoUndo(size_t len) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |