aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authornadya02 <nadya02@yandex-team.com>2025-03-17 17:42:13 +0300
committernadya02 <nadya02@yandex-team.com>2025-03-17 18:56:09 +0300
commit6a0982f0defdc11f0b9d19504da4f4915a8b731c (patch)
treeb72940dcdfe0511616b4fe8b0aa6cace6f2850a6 /library/cpp
parent6b21f0878820fcc53b97c22c8c18b0804e453a74 (diff)
downloadydb-6a0982f0defdc11f0b9d19504da4f4915a8b731c.tar.gz
YT-23989: Track memory in TChunkedOutputStream
commit_hash:1fcce66b4e0a0a5e5fb55aba38889e9bf5b42a85
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/memory/chunked_output_stream.cpp23
-rw-r--r--library/cpp/yt/memory/chunked_output_stream.h7
2 files changed, 26 insertions, 4 deletions
diff --git a/library/cpp/yt/memory/chunked_output_stream.cpp b/library/cpp/yt/memory/chunked_output_stream.cpp
index 90ec36fe23f..d2654209477 100644
--- a/library/cpp/yt/memory/chunked_output_stream.cpp
+++ b/library/cpp/yt/memory/chunked_output_stream.cpp
@@ -8,9 +8,12 @@ namespace NYT {
TChunkedOutputStream::TChunkedOutputStream(
TRefCountedTypeCookie tagCookie,
+ IMemoryUsageTrackerPtr memoryUsageTracker,
size_t initialReserveSize,
size_t maxReserveSize)
- : MaxReserveSize_(RoundUpToPage(maxReserveSize))
+ : MemoryUsageTracker_(std::move(memoryUsageTracker))
+ , CurrentChunkMemoryUsageGuard_(TMemoryUsageTrackerGuard::Build(MemoryUsageTracker_))
+ , MaxReserveSize_(RoundUpToPage(maxReserveSize))
, CurrentReserveSize_(RoundUpToPage(initialReserveSize))
, CurrentChunk_(tagCookie, /*size*/ 0)
{
@@ -23,7 +26,8 @@ TChunkedOutputStream::TChunkedOutputStream(
std::vector<TSharedRef> TChunkedOutputStream::Finish()
{
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+ FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_))));
+ CurrentChunkMemoryUsageGuard_.Release();
YT_ASSERT(CurrentChunk_.IsEmpty());
FinishedSize_ = 0;
@@ -49,9 +53,10 @@ void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired)
{
YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity());
FinishedSize_ += CurrentChunk_.Size();
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+ FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_))));
CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_));
+ UpdateCurrentChunkMemoryUsage();
}
void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
@@ -68,6 +73,7 @@ void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
ReserveNewChunk(spaceRequired);
CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired);
}
+ UpdateCurrentChunkMemoryUsage();
}
size_t TChunkedOutputStream::DoNext(void** ptr)
@@ -84,6 +90,7 @@ size_t TChunkedOutputStream::DoNext(void** ptr)
YT_ASSERT(spaceAvailable > 0);
*ptr = CurrentChunk_.End();
CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false);
+ UpdateCurrentChunkMemoryUsage();
return spaceAvailable;
}
@@ -91,6 +98,7 @@ void TChunkedOutputStream::DoUndo(size_t len)
{
YT_VERIFY(CurrentChunk_.Size() >= len);
CurrentChunk_.Resize(CurrentChunk_.Size() - len);
+ UpdateCurrentChunkMemoryUsage();
}
char* TChunkedOutputStream::Preallocate(size_t size)
@@ -98,12 +106,13 @@ char* TChunkedOutputStream::Preallocate(size_t size)
size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size();
if (available < size) {
FinishedSize_ += CurrentChunk_.Size();
- FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
+ FinishedChunks_.push_back(TrackMemory(MemoryUsageTracker_, TSharedRef::FromBlob(std::move(CurrentChunk_))));
CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_));
}
+ UpdateCurrentChunkMemoryUsage();
return CurrentChunk_.End();
}
@@ -111,6 +120,12 @@ void TChunkedOutputStream::Advance(size_t size)
{
YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity());
CurrentChunk_.Resize(CurrentChunk_.Size() + size, false);
+ UpdateCurrentChunkMemoryUsage();
+}
+
+void TChunkedOutputStream::UpdateCurrentChunkMemoryUsage()
+{
+ CurrentChunkMemoryUsageGuard_.SetSize(CurrentChunk_.Capacity());
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/memory/chunked_output_stream.h b/library/cpp/yt/memory/chunked_output_stream.h
index 57bf6c318df..32a8091e65e 100644
--- a/library/cpp/yt/memory/chunked_output_stream.h
+++ b/library/cpp/yt/memory/chunked_output_stream.h
@@ -1,6 +1,7 @@
#pragma once
#include "blob.h"
+#include "memory_usage_tracker.h"
#include <library/cpp/yt/memory/ref.h>
@@ -21,6 +22,7 @@ class TChunkedOutputStream
public:
explicit TChunkedOutputStream(
TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(),
+ IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker(),
size_t initialReserveSize = 4_KB,
size_t maxReserveSize = 64_KB);
@@ -46,6 +48,9 @@ public:
void Advance(size_t size);
private:
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_;
+
+ TMemoryUsageTrackerGuard CurrentChunkMemoryUsageGuard_;
size_t MaxReserveSize_;
size_t CurrentReserveSize_;
@@ -57,6 +62,8 @@ private:
void ReserveNewChunk(size_t spaceRequired);
+ void UpdateCurrentChunkMemoryUsage();
+
void DoWrite(const void* buf, size_t len) override;
size_t DoNext(void** ptr) override;
void DoUndo(size_t len) override;