diff options
author | nadya02 <nadya02@yandex-team.com> | 2025-03-17 17:42:13 +0300 |
---|---|---|
committer | nadya02 <nadya02@yandex-team.com> | 2025-03-17 18:56:09 +0300 |
commit | 6a0982f0defdc11f0b9d19504da4f4915a8b731c (patch) | |
tree | b72940dcdfe0511616b4fe8b0aa6cace6f2850a6 /library/cpp | |
parent | 6b21f0878820fcc53b97c22c8c18b0804e453a74 (diff) | |
download | ydb-6a0982f0defdc11f0b9d19504da4f4915a8b731c.tar.gz |
YT-23989: Track memory in TChunkedOutputStream
commit_hash:1fcce66b4e0a0a5e5fb55aba38889e9bf5b42a85
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.cpp | 23 | ||||
-rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.h | 7 |
2 files changed, 26 insertions, 4 deletions
diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp index 90ec36fe23f..d2654209477 100644 --- a/library/cpp/yt/memory/chunked_output_stream.cpp +++ b/library/cpp/yt/memory/chunked_output_stream.cpp @@ -8,9 +8,12 @@ namespace NYT { TChunkedOutputStream::TChunkedOutputStream( TRefCountedTypeCookie tagCookie, + IMemoryUsageTrackerPtr memoryUsageTracker, size_t initialReserveSize, size_t maxReserveSize) - : MaxReserveSize_(RoundUpToPage(maxReserveSize)) + : MemoryUsageTracker_(std::move(memoryUsageTracker)) + , CurrentChunkMemoryUsageGuard_(TMemoryUsageTrackerGuard::Build(MemoryUsageTracker_)) + , MaxReserveSize_(RoundUpToPage(maxReserveSize)) , CurrentReserveSize_(RoundUpToPage(initialReserveSize)) , CurrentChunk_(tagCookie, /*size*/ 0) { @@ -23,7 +26,8 @@ TChunkedOutputStream::TChunkedOutputStream( std::vector<TSharedRef> TChunkedOutputStream::Finish() { - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_)))); + CurrentChunkMemoryUsageGuard_.Release(); YT_ASSERT(CurrentChunk_.IsEmpty()); FinishedSize_ = 0; @@ -49,9 +53,10 @@ void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired) { YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity()); FinishedSize_ += CurrentChunk_.Size(); - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_)))); CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_)); + UpdateCurrentChunkMemoryUsage(); } void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) @@ -68,6 +73,7 @@ void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) ReserveNewChunk(spaceRequired); CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired); } + UpdateCurrentChunkMemoryUsage(); } size_t TChunkedOutputStream::DoNext(void** ptr) @@ -84,6 +90,7 @@ size_t TChunkedOutputStream::DoNext(void** ptr) YT_ASSERT(spaceAvailable > 0); *ptr = CurrentChunk_.End(); CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false); + UpdateCurrentChunkMemoryUsage(); return spaceAvailable; } @@ -91,6 +98,7 @@ void TChunkedOutputStream::DoUndo(size_t len) { YT_VERIFY(CurrentChunk_.Size() >= len); CurrentChunk_.Resize(CurrentChunk_.Size() - len); + UpdateCurrentChunkMemoryUsage(); } char* TChunkedOutputStream::Preallocate(size_t size) @@ -98,12 +106,13 @@ char* TChunkedOutputStream::Preallocate(size_t size) size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size(); if (available < size) { FinishedSize_ += CurrentChunk_.Size(); - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_)))); CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_)); } + UpdateCurrentChunkMemoryUsage(); return CurrentChunk_.End(); } @@ -111,6 +120,12 @@ void TChunkedOutputStream::Advance(size_t size) { YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity()); CurrentChunk_.Resize(CurrentChunk_.Size() + size, false); + UpdateCurrentChunkMemoryUsage(); +} + +void TChunkedOutputStream::UpdateCurrentChunkMemoryUsage() +{ + CurrentChunkMemoryUsageGuard_.SetSize(CurrentChunk_.Capacity()); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/memory/chunked_output_stream.h b/library/cpp/yt/memory/chunked_output_stream.h index 57bf6c318df..32a8091e65e 100644 --- a/library/cpp/yt/memory/chunked_output_stream.h +++ b/library/cpp/yt/memory/chunked_output_stream.h @@ -1,6 +1,7 @@ #pragma once #include "blob.h" +#include "memory_usage_tracker.h" #include <library/cpp/yt/memory/ref.h> @@ -21,6 +22,7 @@ class TChunkedOutputStream public: explicit TChunkedOutputStream( TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(), + IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker(), size_t initialReserveSize = 4_KB, size_t maxReserveSize = 64_KB); @@ -46,6 +48,9 @@ public: void Advance(size_t size); private: + const IMemoryUsageTrackerPtr MemoryUsageTracker_; + + TMemoryUsageTrackerGuard CurrentChunkMemoryUsageGuard_; size_t MaxReserveSize_; size_t CurrentReserveSize_; @@ -57,6 +62,8 @@ private: void ReserveNewChunk(size_t spaceRequired); + void UpdateCurrentChunkMemoryUsage(); + void DoWrite(const void* buf, size_t len) override; size_t DoNext(void** ptr) override; void DoUndo(size_t len) override; |