aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2022-11-13 12:15:50 +0300
committerbabenko <babenko@yandex-team.com>2022-11-13 12:15:50 +0300
commitf6425222980211ebf68592a1658e87f0fb442972 (patch)
tree779d1f4392c258027651ee0b7b97f93464747fd7 /library/cpp
parent277cd4ec47419923e9cfabd3ea4726437cbfa53c (diff)
downloadydb-f6425222980211ebf68592a1658e87f0fb442972.tar.gz
Move TChunkedOutputStream to library and make it zero-copy aware
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/memory/CMakeLists.txt1
-rw-r--r--library/cpp/yt/memory/blob.h46
-rw-r--r--library/cpp/yt/memory/chunked_output_stream.cpp118
-rw-r--r--library/cpp/yt/memory/chunked_output_stream.h67
4 files changed, 189 insertions, 43 deletions
diff --git a/library/cpp/yt/memory/CMakeLists.txt b/library/cpp/yt/memory/CMakeLists.txt
index ca44ef58cb6..5bcfddec029 100644
--- a/library/cpp/yt/memory/CMakeLists.txt
+++ b/library/cpp/yt/memory/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(cpp-yt-memory PUBLIC
)
target_sources(cpp-yt-memory PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp
${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/shared_range.cpp
diff --git a/library/cpp/yt/memory/blob.h b/library/cpp/yt/memory/blob.h
index 99441fb8c97..0a1318c2310 100644
--- a/library/cpp/yt/memory/blob.h
+++ b/library/cpp/yt/memory/blob.h
@@ -21,9 +21,9 @@ class TBlob
{
public:
//! Constructs a blob with a given size.
- TBlob(
- TRefCountedTypeCookie tagCookie,
- size_t size,
+ explicit TBlob(
+ TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultBlobTag>(),
+ size_t size = 0,
bool initiailizeStorage = true,
bool pageAligned = false);
@@ -33,46 +33,6 @@ public:
TRef data,
bool pageAligned = false);
- //! Constructs an empty blob.
- template <class TTag = TDefaultBlobTag>
- explicit TBlob(TTag tag = {})
- : TBlob(tag, 0, true, false)
- { }
-
- //! Constructs a blob with a given size.
- template <class TTag>
- explicit TBlob(
- TTag,
- size_t size,
- bool initiailizeStorage = true,
- bool pageAligned = false)
- : TBlob(
- GetRefCountedTypeCookie<TTag>(),
- size,
- initiailizeStorage,
- pageAligned)
- { }
-
- //! Copies a chunk of memory into a new instance.
- template <class TTag>
- TBlob(
- TTag,
- TRef data,
- bool pageAligned = false)
- : TBlob(
- GetRefCountedTypeCookie<TTag>(),
- data,
- pageAligned)
- { }
-
- //! Remind user about the tag argument.
- TBlob(i32 size, bool initiailizeStorage = true) = delete;
- TBlob(i64 size, bool initiailizeStorage = true) = delete;
- TBlob(ui32 size, bool initiailizeStorage = true) = delete;
- TBlob(ui64 size, bool initiailizeStorage = true) = delete;
- template <typename T, typename U>
- TBlob(const T*, U) = delete;
-
//! Copies the data.
TBlob(const TBlob& other);
diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp
new file mode 100644
index 00000000000..90ec36fe23f
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_output_stream.cpp
@@ -0,0 +1,118 @@
+#include "chunked_output_stream.h"
+
+#include <util/system/sanitizers.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TChunkedOutputStream::TChunkedOutputStream(
+ TRefCountedTypeCookie tagCookie,
+ size_t initialReserveSize,
+ size_t maxReserveSize)
+ : MaxReserveSize_(RoundUpToPage(maxReserveSize))
+ , CurrentReserveSize_(RoundUpToPage(initialReserveSize))
+ , CurrentChunk_(tagCookie, /*size*/ 0)
+{
+ YT_VERIFY(MaxReserveSize_ > 0);
+
+ if (CurrentReserveSize_ > MaxReserveSize_) {
+ CurrentReserveSize_ = MaxReserveSize_;
+ }
+}
+
+std::vector<TSharedRef> TChunkedOutputStream::Finish()
+{
+ FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+
+ YT_ASSERT(CurrentChunk_.IsEmpty());
+ FinishedSize_ = 0;
+
+ for (const auto& chunk : FinishedChunks_) {
+ NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size());
+ }
+
+ return std::move(FinishedChunks_);
+}
+
+size_t TChunkedOutputStream::GetSize() const
+{
+ return FinishedSize_ + CurrentChunk_.Size();
+}
+
+size_t TChunkedOutputStream::GetCapacity() const
+{
+ return FinishedSize_ + CurrentChunk_.Capacity();
+}
+
+void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired)
+{
+ YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity());
+ FinishedSize_ += CurrentChunk_.Size();
+ FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+ CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
+ CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_));
+}
+
+void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
+{
+ if (CurrentChunk_.Capacity() == 0) {
+ CurrentChunk_.Reserve(CurrentReserveSize_);
+ }
+
+ auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size());
+ CurrentChunk_.Append(buffer, spaceAvailable);
+
+ auto spaceRequired = length - spaceAvailable;
+ if (spaceRequired > 0) {
+ ReserveNewChunk(spaceRequired);
+ CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired);
+ }
+}
+
+size_t TChunkedOutputStream::DoNext(void** ptr)
+{
+ if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) {
+ if (CurrentChunk_.Capacity() == 0) {
+ CurrentChunk_.Reserve(CurrentReserveSize_);
+ } else {
+ ReserveNewChunk(0);
+ }
+ }
+
+ auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size();
+ YT_ASSERT(spaceAvailable > 0);
+ *ptr = CurrentChunk_.End();
+ CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false);
+ return spaceAvailable;
+}
+
+void TChunkedOutputStream::DoUndo(size_t len)
+{
+ YT_VERIFY(CurrentChunk_.Size() >= len);
+ CurrentChunk_.Resize(CurrentChunk_.Size() - len);
+}
+
+char* TChunkedOutputStream::Preallocate(size_t size)
+{
+ size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size();
+ if (available < size) {
+ FinishedSize_ += CurrentChunk_.Size();
+ FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+
+ CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
+
+ CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_));
+ }
+ return CurrentChunk_.End();
+}
+
+void TChunkedOutputStream::Advance(size_t size)
+{
+ YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity());
+ CurrentChunk_.Resize(CurrentChunk_.Size() + size, false);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/chunked_output_stream.h b/library/cpp/yt/memory/chunked_output_stream.h
new file mode 100644
index 00000000000..57bf6c318df
--- /dev/null
+++ b/library/cpp/yt/memory/chunked_output_stream.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "blob.h"
+
+#include <library/cpp/yt/memory/ref.h>
+
+#include <util/stream/zerocopy_output.h>
+
+#include <util/generic/size_literals.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TDefaultChunkedOutputStreamTag
+{ };
+
+class TChunkedOutputStream
+ : public IZeroCopyOutput
+{
+public:
+ explicit TChunkedOutputStream(
+ TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(),
+ size_t initialReserveSize = 4_KB,
+ size_t maxReserveSize = 64_KB);
+
+ TChunkedOutputStream(TChunkedOutputStream&&) = default;
+ TChunkedOutputStream& operator=(TChunkedOutputStream&&) = default;
+
+ //! Returns a sequence of written chunks.
+ //! The stream is no longer usable after this call.
+ std::vector<TSharedRef> Finish();
+
+ //! Returns the number of bytes actually written.
+ size_t GetSize() const;
+
+ //! Returns the number of bytes actually written plus unused capacity in the
+ //! last chunk.
+ size_t GetCapacity() const;
+
+ //! Returns a pointer to a contiguous memory block of a given #size.
+ //! Do not forget to call #Advance after use.
+ char* Preallocate(size_t size);
+
+ //! Marks #size bytes (which were previously preallocated) as used.
+ void Advance(size_t size);
+
+private:
+ size_t MaxReserveSize_;
+ size_t CurrentReserveSize_;
+
+ size_t FinishedSize_ = 0;
+
+ TBlob CurrentChunk_;
+ std::vector<TSharedRef> FinishedChunks_;
+
+
+ void ReserveNewChunk(size_t spaceRequired);
+
+ void DoWrite(const void* buf, size_t len) override;
+ size_t DoNext(void** ptr) override;
+ void DoUndo(size_t len) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT