summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authornadya02 <[email protected]>2025-03-14 19:14:50 +0300
committernadya02 <[email protected]>2025-03-14 19:34:25 +0300
commit643e648fe31a1ad9250264986d147702c868a9c5 (patch)
tree5a8b891bb359ae98b0f5b91e56b4cca3e72d1ece /library/cpp
parentfce2a692db534e71e5f317d2f7b40ea89753e187 (diff)
YT-23989: Move memory tracker to library/cpp/yt
commit_hash:5dad6ada81567dcb5da6ef1efe47e1738196d219
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/memory/memory_usage_tracker.cpp405
-rw-r--r--library/cpp/yt/memory/memory_usage_tracker.h180
-rw-r--r--library/cpp/yt/memory/public.h2
-rw-r--r--library/cpp/yt/memory/ya.make1
4 files changed, 588 insertions, 0 deletions
diff --git a/library/cpp/yt/memory/memory_usage_tracker.cpp b/library/cpp/yt/memory/memory_usage_tracker.cpp
new file mode 100644
index 00000000000..b87e0bb89ba
--- /dev/null
+++ b/library/cpp/yt/memory/memory_usage_tracker.cpp
@@ -0,0 +1,405 @@
+#include "memory_usage_tracker.h"
+
+#include "leaky_ref_counted_singleton.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TNullMemoryUsageTracker
+ : public IMemoryUsageTracker
+{
+public:
+ TError TryAcquire(i64 /*size*/) override
+ {
+ return {};
+ }
+
+ TError TryChange(i64 /*size*/) override
+ {
+ return {};
+ }
+
+ bool Acquire(i64 /*size*/) override
+ {
+ return false;
+ }
+
+ void Release(i64 /*size*/) override
+ { }
+
+ void SetLimit(i64 /*size*/) override
+ { }
+
+ i64 GetLimit() const override
+ {
+ return std::numeric_limits<i64>::max();
+ }
+
+ i64 GetUsed() const override
+ {
+ return 0;
+ }
+
+ i64 GetFree() const override
+ {
+ return std::numeric_limits<i64>::max();
+ }
+
+ bool IsExceeded() const override
+ {
+ return false;
+ }
+
+ TSharedRef Track(
+ TSharedRef reference,
+ bool /*keepExistingTracking*/) override
+ {
+ return reference;
+ }
+
+ TErrorOr<TSharedRef> TryTrack(
+ TSharedRef reference,
+ bool /*keepExistingTracking*/) override
+ {
+ return reference;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IMemoryUsageTrackerPtr GetNullMemoryUsageTracker()
+{
+ return LeakyRefCountedSingleton<TNullMemoryUsageTracker>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TMemoryUsageTrackerGuard::TMemoryUsageTrackerGuard(TMemoryUsageTrackerGuard&& other)
+{
+ MoveFrom(std::move(other));
+}
+
+TMemoryUsageTrackerGuard::~TMemoryUsageTrackerGuard()
+{
+ Release();
+}
+
+TMemoryUsageTrackerGuard& TMemoryUsageTrackerGuard::operator=(TMemoryUsageTrackerGuard&& other)
+{
+ if (this != &other) {
+ Release();
+ MoveFrom(std::move(other));
+ }
+ return *this;
+}
+
+void TMemoryUsageTrackerGuard::MoveFrom(TMemoryUsageTrackerGuard&& other)
+{
+ Tracker_ = other.Tracker_;
+ Size_ = other.Size_;
+ AcquiredSize_ = other.AcquiredSize_;
+ Granularity_ = other.Granularity_;
+
+ other.Tracker_ = nullptr;
+ other.Size_ = 0;
+ other.AcquiredSize_ = 0;
+ other.Granularity_ = 0;
+}
+
+TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::Build(
+ IMemoryUsageTrackerPtr tracker,
+ i64 granularity)
+{
+ if (!tracker) {
+ return {};
+ }
+
+ TMemoryUsageTrackerGuard guard;
+ guard.Tracker_ = tracker;
+ guard.Size_ = 0;
+ guard.Granularity_ = granularity;
+ return guard;
+}
+
+TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::Acquire(
+ IMemoryUsageTrackerPtr tracker,
+ i64 size,
+ i64 granularity)
+{
+ if (!tracker) {
+ return {};
+ }
+
+ YT_VERIFY(size >= 0);
+ TMemoryUsageTrackerGuard guard;
+ guard.Tracker_ = tracker;
+ guard.Size_ = size;
+ guard.Granularity_ = granularity;
+ if (size >= granularity) {
+ guard.AcquiredSize_ = size;
+ tracker->Acquire(size);
+ }
+ return guard;
+}
+
+TErrorOr<TMemoryUsageTrackerGuard> TMemoryUsageTrackerGuard::TryAcquire(
+ IMemoryUsageTrackerPtr tracker,
+ i64 size,
+ i64 granularity)
+{
+ if (!tracker) {
+ return {};
+ }
+
+ YT_VERIFY(size >= 0);
+
+ auto error = tracker->TryAcquire(size);
+ if (!error.IsOK()) {
+ return error;
+ }
+ TMemoryUsageTrackerGuard guard;
+ guard.Tracker_ = tracker;
+ guard.Size_ = size;
+ guard.AcquiredSize_ = size;
+ guard.Granularity_ = granularity;
+ return std::move(guard);
+}
+
+void TMemoryUsageTrackerGuard::Release()
+{
+ if (Tracker_) {
+ if (AcquiredSize_) {
+ Tracker_->Release(AcquiredSize_);
+ }
+
+ ReleaseNoReclaim();
+ }
+}
+
+void TMemoryUsageTrackerGuard::ReleaseNoReclaim()
+{
+ Tracker_.Reset();
+ Size_ = 0;
+ AcquiredSize_ = 0;
+ Granularity_ = 0;
+}
+
+TMemoryUsageTrackerGuard::operator bool() const
+{
+ return Tracker_.operator bool();
+}
+
+i64 TMemoryUsageTrackerGuard::GetSize() const
+{
+ return Size_;
+}
+
+void TMemoryUsageTrackerGuard::SetSize(i64 size)
+{
+ auto ignoredError = SetSizeImpl(size, [&] (i64 delta) {
+ Tracker_->Acquire(delta);
+ return TError{};
+ });
+
+ Y_UNUSED(ignoredError);
+}
+
+TError TMemoryUsageTrackerGuard::TrySetSize(i64 size)
+{
+ return SetSizeImpl(size, [&] (i64 delta) {
+ return Tracker_->TryAcquire(delta);
+ });
+}
+
+TError TMemoryUsageTrackerGuard::SetSizeImpl(i64 size, auto acquirer)
+{
+ if (!Tracker_) {
+ return {};
+ }
+
+ YT_VERIFY(size >= 0);
+ Size_ = size;
+ if (std::abs(Size_ - AcquiredSize_) >= Granularity_) {
+ if (Size_ > AcquiredSize_) {
+ if (auto result = acquirer(Size_ - AcquiredSize_); !result.IsOK()) {
+ return result;
+ }
+ } else {
+ Tracker_->Release(AcquiredSize_ - Size_);
+ }
+ AcquiredSize_ = Size_;
+ }
+
+ return {};
+}
+
+void TMemoryUsageTrackerGuard::IncreaseSize(i64 sizeDelta)
+{
+ SetSize(Size_ + sizeDelta);
+}
+
+void TMemoryUsageTrackerGuard::DecreaseSize(i64 sizeDelta)
+{
+ SetSize(Size_ - sizeDelta);
+}
+
+TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::TransferMemory(i64 size)
+{
+ YT_VERIFY(Size_ >= size);
+
+ auto acquiredDelta = std::min(AcquiredSize_, size);
+
+ Size_ -= size;
+ AcquiredSize_ -= acquiredDelta;
+
+ TMemoryUsageTrackerGuard guard;
+ guard.Tracker_ = Tracker_;
+ guard.Size_ = size;
+ guard.AcquiredSize_ = acquiredDelta;
+ guard.Granularity_ = Granularity_;
+ return std::move(guard);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TMemoryTrackedBlob::TMemoryTrackedBlob(
+ TBlob&& blob,
+ TMemoryUsageTrackerGuard&& guard)
+ : Blob_(std::move(blob))
+ , Guard_(std::move(guard))
+{ }
+
+TMemoryTrackedBlob TMemoryTrackedBlob::Build(
+ IMemoryUsageTrackerPtr tracker,
+ TRefCountedTypeCookie tagCookie)
+{
+ YT_VERIFY(tracker);
+
+ return TMemoryTrackedBlob(
+ TBlob(tagCookie),
+ TMemoryUsageTrackerGuard::Build(tracker));
+}
+
+void TMemoryTrackedBlob::Resize(
+ i64 size,
+ bool initializeStorage)
+{
+ YT_VERIFY(size >= 0);
+
+ Blob_.Resize(size, initializeStorage);
+ Guard_.SetSize(Blob_.Capacity());
+}
+
+TError TMemoryTrackedBlob::TryResize(
+ i64 size,
+ bool initializeStorage)
+{
+ YT_VERIFY(size >= 0);
+ auto result = Guard_.TrySetSize(size);
+
+ if (result.IsOK()) {
+ Blob_.Resize(size, initializeStorage);
+ return {};
+ } else {
+ return result;
+ }
+}
+
+void TMemoryTrackedBlob::Reserve(i64 size)
+{
+ YT_VERIFY(size >= 0);
+
+ Blob_.Reserve(size);
+ Guard_.SetSize(Blob_.Capacity());
+}
+
+TError TMemoryTrackedBlob::TryReserve(i64 size)
+{
+ YT_VERIFY(size >= 0);
+
+ auto result = Guard_.TrySetSize(size);
+
+ if (result.IsOK()) {
+ Blob_.Reserve(size);
+ return {};
+ } else {
+ return result;
+ }
+}
+
+char* TMemoryTrackedBlob::Begin()
+{
+ return Blob_.Begin();
+}
+
+char* TMemoryTrackedBlob::End()
+{
+ return Blob_.End();
+}
+
+size_t TMemoryTrackedBlob::Capacity() const
+{
+ return Blob_.Capacity();
+}
+
+size_t TMemoryTrackedBlob::Size() const
+{
+ return Blob_.Size();
+}
+
+void TMemoryTrackedBlob::Append(TRef ref)
+{
+ Blob_.Append(ref);
+ Guard_.SetSize(Blob_.Capacity());
+}
+
+void TMemoryTrackedBlob::Clear()
+{
+ Blob_.Clear();
+ Guard_.SetSize(Blob_.Capacity());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TErrorOr<TSharedRef> TryTrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRef reference,
+ bool keepExistingTracking)
+{
+ if (!tracker || !reference) {
+ return reference;
+ }
+ return tracker->TryTrack(std::move(reference), keepExistingTracking);
+}
+
+TSharedRef TrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRef reference,
+ bool keepExistingTracking)
+{
+ if (!tracker || !reference) {
+ return reference;
+ }
+ return tracker->Track(std::move(reference), keepExistingTracking);
+}
+
+TSharedRefArray TrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRefArray array,
+ bool keepExistingTracking)
+{
+ if (!tracker || !array) {
+ return array;
+ }
+ TSharedRefArrayBuilder builder(array.Size());
+ for (const auto& part : array) {
+ builder.Add(tracker->Track(part, keepExistingTracking));
+ }
+ return builder.Finish();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
diff --git a/library/cpp/yt/memory/memory_usage_tracker.h b/library/cpp/yt/memory/memory_usage_tracker.h
new file mode 100644
index 00000000000..524f05a34a1
--- /dev/null
+++ b/library/cpp/yt/memory/memory_usage_tracker.h
@@ -0,0 +1,180 @@
+#pragma once
+
+#include "blob.h"
+#include "ref.h"
+
+#include <library/cpp/yt/error/error.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IMemoryUsageTracker
+ : public TRefCounted
+{
+ virtual TError TryAcquire(i64 size) = 0;
+ virtual TError TryChange(i64 size) = 0;
+ //! Returns true unless overcommit occurred.
+ virtual bool Acquire(i64 size) = 0;
+ virtual void Release(i64 size) = 0;
+ virtual void SetLimit(i64 size) = 0;
+ virtual i64 GetLimit() const = 0;
+ virtual i64 GetUsed() const = 0;
+ virtual i64 GetFree() const = 0;
+ virtual bool IsExceeded() const = 0;
+
+ //! Tracks memory used by references.
+ /*!
+ * Memory tracking is implemented by specific shared ref holders.
+ * #Track returns reference with a holder that wraps the old one and also
+ * enables accounting memory in memory tracker's internal state.
+
+ * Subsequent #Track calls for this reference drop memory reference tracker's
+ * holder unless #keepExistingTracking is true.
+ */
+ virtual TSharedRef Track(
+ TSharedRef reference,
+ bool keepHolder = false) = 0;
+ virtual TErrorOr<TSharedRef> TryTrack(
+ TSharedRef reference,
+ bool keepHolder) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IMemoryUsageTracker)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IReservingMemoryUsageTracker
+ : public IMemoryUsageTracker
+{
+ virtual void ReleaseUnusedReservation() = 0;
+ virtual TError TryReserve(i64 size) = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IReservingMemoryUsageTracker)
+
+////////////////////////////////////////////////////////////////////////////////
+
+IMemoryUsageTrackerPtr GetNullMemoryUsageTracker();
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMemoryUsageTrackerGuard
+ : private TNonCopyable
+{
+public:
+ TMemoryUsageTrackerGuard() = default;
+ TMemoryUsageTrackerGuard(const TMemoryUsageTrackerGuard& other) = delete;
+ TMemoryUsageTrackerGuard(TMemoryUsageTrackerGuard&& other);
+ ~TMemoryUsageTrackerGuard();
+
+ TMemoryUsageTrackerGuard& operator=(const TMemoryUsageTrackerGuard& other) = delete;
+ TMemoryUsageTrackerGuard& operator=(TMemoryUsageTrackerGuard&& other);
+
+ static TMemoryUsageTrackerGuard Build(
+ IMemoryUsageTrackerPtr tracker,
+ i64 granularity = 1);
+ static TMemoryUsageTrackerGuard Acquire(
+ IMemoryUsageTrackerPtr tracker,
+ i64 size,
+ i64 granularity = 1);
+ static TErrorOr<TMemoryUsageTrackerGuard> TryAcquire(
+ IMemoryUsageTrackerPtr tracker,
+ i64 size,
+ i64 granularity = 1);
+
+ void Release();
+
+ //! Releases the guard but does not return memory to the tracker.
+ //! The caller should care about releasing memory itself.
+ void ReleaseNoReclaim();
+
+ explicit operator bool() const;
+
+ i64 GetSize() const;
+ void SetSize(i64 size);
+ TError TrySetSize(i64 size);
+ void IncreaseSize(i64 sizeDelta);
+ void DecreaseSize(i64 sizeDelta);
+ TMemoryUsageTrackerGuard TransferMemory(i64 size);
+
+private:
+ IMemoryUsageTrackerPtr Tracker_;
+ i64 Size_ = 0;
+ i64 AcquiredSize_ = 0;
+ i64 Granularity_ = 0;
+
+ void MoveFrom(TMemoryUsageTrackerGuard&& other);
+ TError SetSizeImpl(i64 size, auto acquirer);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMemoryTrackedBlob
+{
+public:
+ static TMemoryTrackedBlob Build(
+ IMemoryUsageTrackerPtr tracker,
+ TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultBlobTag>());
+
+ TMemoryTrackedBlob() = default;
+ TMemoryTrackedBlob(const TMemoryTrackedBlob& other) = delete;
+ TMemoryTrackedBlob(TMemoryTrackedBlob&& other) = default;
+ ~TMemoryTrackedBlob() = default;
+
+ TMemoryTrackedBlob& operator=(const TMemoryTrackedBlob& other) = delete;
+ TMemoryTrackedBlob& operator=(TMemoryTrackedBlob&& other) = default;
+
+ void Resize(
+ i64 size,
+ bool initializeStorage = true);
+
+ TError TryResize(
+ i64 size,
+ bool initializeStorage = true);
+
+ void Reserve(i64 capacity);
+
+ TError TryReserve(i64 capacity);
+
+ char* Begin();
+
+ char* End();
+
+ size_t Capacity() const;
+
+ size_t Size() const;
+
+ void Append(TRef ref);
+
+ void Clear();
+
+private:
+ TBlob Blob_;
+ TMemoryUsageTrackerGuard Guard_;
+
+ TMemoryTrackedBlob(
+ TBlob&& blob,
+ TMemoryUsageTrackerGuard&& guard);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TErrorOr<TSharedRef> TryTrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRef reference,
+ bool keepExistingTracking = false);
+
+TSharedRef TrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRef reference,
+ bool keepExistingTracking = false);
+
+TSharedRefArray TrackMemory(
+ const IMemoryUsageTrackerPtr& tracker,
+ TSharedRefArray array,
+ bool keepExistingTracking = false);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h
index e348c23439c..b931f980efb 100644
--- a/library/cpp/yt/memory/public.h
+++ b/library/cpp/yt/memory/public.h
@@ -12,6 +12,8 @@ constexpr size_t CacheLineSize = 64;
class TChunkedMemoryPool;
DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider)
+DECLARE_REFCOUNTED_STRUCT(IMemoryUsageTracker)
+DECLARE_REFCOUNTED_STRUCT(IReservingMemoryUsageTracker)
DECLARE_REFCOUNTED_STRUCT(TSharedRangeHolder)
using TMemoryTag = ui32;
diff --git a/library/cpp/yt/memory/ya.make b/library/cpp/yt/memory/ya.make
index d0c377cf3d1..18ce62844c2 100644
--- a/library/cpp/yt/memory/ya.make
+++ b/library/cpp/yt/memory/ya.make
@@ -15,6 +15,7 @@ SRCS(
chunked_memory_pool_output.cpp
chunked_output_stream.cpp
memory_tag.cpp
+ memory_usage_tracker.cpp
new.cpp
poison.cpp
ref.cpp