diff options
| author | nadya02 <[email protected]> | 2025-06-11 17:56:39 +0300 |
|---|---|---|
| committer | nadya02 <[email protected]> | 2025-06-11 18:25:18 +0300 |
| commit | b82002b0fabe461ff9aee40ca37a129c5e7cb263 (patch) | |
| tree | d8460ee0dcb6aeac494b92f19f2b6e12f175fa5b /library/cpp/yt/memory/chunked_output_stream.cpp | |
| parent | f4b58920234346ad15c0ba9de5b9ab66d90c425c (diff) | |
Restore measuring the memory usage of write-table queries.
commit_hash:1d28ad476551aee1d5ab64c72205347128c27c68
Diffstat (limited to 'library/cpp/yt/memory/chunked_output_stream.cpp')
| -rw-r--r-- | library/cpp/yt/memory/chunked_output_stream.cpp | 31 |
1 files changed, 27 insertions, 4 deletions
diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp index 90ec36fe23f..bf62412e00d 100644 --- a/library/cpp/yt/memory/chunked_output_stream.cpp +++ b/library/cpp/yt/memory/chunked_output_stream.cpp @@ -8,9 +8,12 @@ namespace NYT { TChunkedOutputStream::TChunkedOutputStream( TRefCountedTypeCookie tagCookie, + ISimpleMemoryUsageTrackerPtr memoryUsageTracker, size_t initialReserveSize, size_t maxReserveSize) - : MaxReserveSize_(RoundUpToPage(maxReserveSize)) + : MemoryUsageTracker_(std::move(memoryUsageTracker)) + , CurrentChunkMemoryUsageGuard_(TSimpleMemoryUsageTrackerGuard::Build(MemoryUsageTracker_)) + , MaxReserveSize_(RoundUpToPage(maxReserveSize)) , CurrentReserveSize_(RoundUpToPage(initialReserveSize)) , CurrentChunk_(tagCookie, /*size*/ 0) { @@ -23,7 +26,11 @@ TChunkedOutputStream::TChunkedOutputStream( std::vector<TSharedRef> TChunkedOutputStream::Finish() { - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory( + MemoryUsageTracker_, + TSharedRef::FromBlob(std::move(CurrentChunk_)))); + + CurrentChunkMemoryUsageGuard_.Release(); YT_ASSERT(CurrentChunk_.IsEmpty()); FinishedSize_ = 0; @@ -49,9 +56,13 @@ void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired) { YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity()); FinishedSize_ += CurrentChunk_.Size(); - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory( + MemoryUsageTracker_, + TSharedRef::FromBlob(std::move(CurrentChunk_)))); + CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_)); + UpdateCurrentChunkMemoryUsage(); } void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) @@ -68,6 +79,7 @@ void TChunkedOutputStream::DoWrite(const void* buffer, size_t length) ReserveNewChunk(spaceRequired); CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired); } + UpdateCurrentChunkMemoryUsage(); } size_t TChunkedOutputStream::DoNext(void** ptr) @@ -84,6 +96,7 @@ size_t TChunkedOutputStream::DoNext(void** ptr) YT_ASSERT(spaceAvailable > 0); *ptr = CurrentChunk_.End(); CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false); + UpdateCurrentChunkMemoryUsage(); return spaceAvailable; } @@ -91,6 +104,7 @@ void TChunkedOutputStream::DoUndo(size_t len) { YT_VERIFY(CurrentChunk_.Size() >= len); CurrentChunk_.Resize(CurrentChunk_.Size() - len); + UpdateCurrentChunkMemoryUsage(); } char* TChunkedOutputStream::Preallocate(size_t size) @@ -98,12 +112,15 @@ char* TChunkedOutputStream::Preallocate(size_t size) size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size(); if (available < size) { FinishedSize_ += CurrentChunk_.Size(); - FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_))); + FinishedChunks_.push_back(TrackMemory( + MemoryUsageTracker_, + TSharedRef::FromBlob(std::move(CurrentChunk_)))); CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_); CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_)); } + UpdateCurrentChunkMemoryUsage(); return CurrentChunk_.End(); } @@ -111,6 +128,12 @@ void TChunkedOutputStream::Advance(size_t size) { YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity()); CurrentChunk_.Resize(CurrentChunk_.Size() + size, false); + UpdateCurrentChunkMemoryUsage(); +} + +void TChunkedOutputStream::UpdateCurrentChunkMemoryUsage() +{ + CurrentChunkMemoryUsageGuard_.SetSize(CurrentChunk_.Capacity()); } //////////////////////////////////////////////////////////////////////////////// |
