diff options
| author | nadya02 <[email protected]> | 2025-03-14 19:14:50 +0300 |
|---|---|---|
| committer | nadya02 <[email protected]> | 2025-03-14 19:34:25 +0300 |
| commit | 643e648fe31a1ad9250264986d147702c868a9c5 (patch) | |
| tree | 5a8b891bb359ae98b0f5b91e56b4cca3e72d1ece /library/cpp | |
| parent | fce2a692db534e71e5f317d2f7b40ea89753e187 (diff) | |
YT-23989: Move memory tracker to library/cpp/yt
commit_hash:5dad6ada81567dcb5da6ef1efe47e1738196d219
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/yt/memory/memory_usage_tracker.cpp | 405 | ||||
| -rw-r--r-- | library/cpp/yt/memory/memory_usage_tracker.h | 180 | ||||
| -rw-r--r-- | library/cpp/yt/memory/public.h | 2 | ||||
| -rw-r--r-- | library/cpp/yt/memory/ya.make | 1 |
4 files changed, 588 insertions, 0 deletions
diff --git a/library/cpp/yt/memory/memory_usage_tracker.cpp b/library/cpp/yt/memory/memory_usage_tracker.cpp new file mode 100644 index 00000000000..b87e0bb89ba --- /dev/null +++ b/library/cpp/yt/memory/memory_usage_tracker.cpp @@ -0,0 +1,405 @@ +#include "memory_usage_tracker.h" + +#include "leaky_ref_counted_singleton.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TNullMemoryUsageTracker + : public IMemoryUsageTracker +{ +public: + TError TryAcquire(i64 /*size*/) override + { + return {}; + } + + TError TryChange(i64 /*size*/) override + { + return {}; + } + + bool Acquire(i64 /*size*/) override + { + return false; + } + + void Release(i64 /*size*/) override + { } + + void SetLimit(i64 /*size*/) override + { } + + i64 GetLimit() const override + { + return std::numeric_limits<i64>::max(); + } + + i64 GetUsed() const override + { + return 0; + } + + i64 GetFree() const override + { + return std::numeric_limits<i64>::max(); + } + + bool IsExceeded() const override + { + return false; + } + + TSharedRef Track( + TSharedRef reference, + bool /*keepExistingTracking*/) override + { + return reference; + } + + TErrorOr<TSharedRef> TryTrack( + TSharedRef reference, + bool /*keepExistingTracking*/) override + { + return reference; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IMemoryUsageTrackerPtr GetNullMemoryUsageTracker() +{ + return LeakyRefCountedSingleton<TNullMemoryUsageTracker>(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TMemoryUsageTrackerGuard::TMemoryUsageTrackerGuard(TMemoryUsageTrackerGuard&& other) +{ + MoveFrom(std::move(other)); +} + +TMemoryUsageTrackerGuard::~TMemoryUsageTrackerGuard() +{ + Release(); +} + +TMemoryUsageTrackerGuard& TMemoryUsageTrackerGuard::operator=(TMemoryUsageTrackerGuard&& other) +{ + if (this != &other) { + Release(); + MoveFrom(std::move(other)); + } + return *this; +} + +void TMemoryUsageTrackerGuard::MoveFrom(TMemoryUsageTrackerGuard&& other) +{ + Tracker_ = other.Tracker_; + Size_ = other.Size_; + AcquiredSize_ = other.AcquiredSize_; + Granularity_ = other.Granularity_; + + other.Tracker_ = nullptr; + other.Size_ = 0; + other.AcquiredSize_ = 0; + other.Granularity_ = 0; +} + +TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::Build( + IMemoryUsageTrackerPtr tracker, + i64 granularity) +{ + if (!tracker) { + return {}; + } + + TMemoryUsageTrackerGuard guard; + guard.Tracker_ = tracker; + guard.Size_ = 0; + guard.Granularity_ = granularity; + return guard; +} + +TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::Acquire( + IMemoryUsageTrackerPtr tracker, + i64 size, + i64 granularity) +{ + if (!tracker) { + return {}; + } + + YT_VERIFY(size >= 0); + TMemoryUsageTrackerGuard guard; + guard.Tracker_ = tracker; + guard.Size_ = size; + guard.Granularity_ = granularity; + if (size >= granularity) { + guard.AcquiredSize_ = size; + tracker->Acquire(size); + } + return guard; +} + +TErrorOr<TMemoryUsageTrackerGuard> TMemoryUsageTrackerGuard::TryAcquire( + IMemoryUsageTrackerPtr tracker, + i64 size, + i64 granularity) +{ + if (!tracker) { + return {}; + } + + YT_VERIFY(size >= 0); + + auto error = tracker->TryAcquire(size); + if (!error.IsOK()) { + return error; + } + TMemoryUsageTrackerGuard guard; + guard.Tracker_ = tracker; + guard.Size_ = size; + guard.AcquiredSize_ = size; + guard.Granularity_ = granularity; + return std::move(guard); +} + +void TMemoryUsageTrackerGuard::Release() +{ + if (Tracker_) { + if (AcquiredSize_) { + Tracker_->Release(AcquiredSize_); + } + + ReleaseNoReclaim(); + } +} + +void TMemoryUsageTrackerGuard::ReleaseNoReclaim() +{ + Tracker_.Reset(); + Size_ = 0; + AcquiredSize_ = 0; + Granularity_ = 0; +} + +TMemoryUsageTrackerGuard::operator bool() const +{ + return Tracker_.operator bool(); +} + +i64 TMemoryUsageTrackerGuard::GetSize() const +{ + return Size_; +} + +void TMemoryUsageTrackerGuard::SetSize(i64 size) +{ + auto ignoredError = SetSizeImpl(size, [&] (i64 delta) { + Tracker_->Acquire(delta); + return TError{}; + }); + + Y_UNUSED(ignoredError); +} + +TError TMemoryUsageTrackerGuard::TrySetSize(i64 size) +{ + return SetSizeImpl(size, [&] (i64 delta) { + return Tracker_->TryAcquire(delta); + }); +} + +TError TMemoryUsageTrackerGuard::SetSizeImpl(i64 size, auto acquirer) +{ + if (!Tracker_) { + return {}; + } + + YT_VERIFY(size >= 0); + Size_ = size; + if (std::abs(Size_ - AcquiredSize_) >= Granularity_) { + if (Size_ > AcquiredSize_) { + if (auto result = acquirer(Size_ - AcquiredSize_); !result.IsOK()) { + return result; + } + } else { + Tracker_->Release(AcquiredSize_ - Size_); + } + AcquiredSize_ = Size_; + } + + return {}; +} + +void TMemoryUsageTrackerGuard::IncreaseSize(i64 sizeDelta) +{ + SetSize(Size_ + sizeDelta); +} + +void TMemoryUsageTrackerGuard::DecreaseSize(i64 sizeDelta) +{ + SetSize(Size_ - sizeDelta); +} + +TMemoryUsageTrackerGuard TMemoryUsageTrackerGuard::TransferMemory(i64 size) +{ + YT_VERIFY(Size_ >= size); + + auto acquiredDelta = std::min(AcquiredSize_, size); + + Size_ -= size; + AcquiredSize_ -= acquiredDelta; + + TMemoryUsageTrackerGuard guard; + guard.Tracker_ = Tracker_; + guard.Size_ = size; + guard.AcquiredSize_ = acquiredDelta; + guard.Granularity_ = Granularity_; + return std::move(guard); +} + +//////////////////////////////////////////////////////////////////////////////// + +TMemoryTrackedBlob::TMemoryTrackedBlob( + TBlob&& blob, + TMemoryUsageTrackerGuard&& guard) + : Blob_(std::move(blob)) + , Guard_(std::move(guard)) +{ } + +TMemoryTrackedBlob TMemoryTrackedBlob::Build( + IMemoryUsageTrackerPtr tracker, + TRefCountedTypeCookie tagCookie) +{ + YT_VERIFY(tracker); + + return TMemoryTrackedBlob( + TBlob(tagCookie), + TMemoryUsageTrackerGuard::Build(tracker)); +} + +void TMemoryTrackedBlob::Resize( + i64 size, + bool initializeStorage) +{ + YT_VERIFY(size >= 0); + + Blob_.Resize(size, initializeStorage); + Guard_.SetSize(Blob_.Capacity()); +} + +TError TMemoryTrackedBlob::TryResize( + i64 size, + bool initializeStorage) +{ + YT_VERIFY(size >= 0); + auto result = Guard_.TrySetSize(size); + + if (result.IsOK()) { + Blob_.Resize(size, initializeStorage); + return {}; + } else { + return result; + } +} + +void TMemoryTrackedBlob::Reserve(i64 size) +{ + YT_VERIFY(size >= 0); + + Blob_.Reserve(size); + Guard_.SetSize(Blob_.Capacity()); +} + +TError TMemoryTrackedBlob::TryReserve(i64 size) +{ + YT_VERIFY(size >= 0); + + auto result = Guard_.TrySetSize(size); + + if (result.IsOK()) { + Blob_.Reserve(size); + return {}; + } else { + return result; + } +} + +char* TMemoryTrackedBlob::Begin() +{ + return Blob_.Begin(); +} + +char* TMemoryTrackedBlob::End() +{ + return Blob_.End(); +} + +size_t TMemoryTrackedBlob::Capacity() const +{ + return Blob_.Capacity(); +} + +size_t TMemoryTrackedBlob::Size() const +{ + return Blob_.Size(); +} + +void TMemoryTrackedBlob::Append(TRef ref) +{ + Blob_.Append(ref); + Guard_.SetSize(Blob_.Capacity()); +} + +void TMemoryTrackedBlob::Clear() +{ + Blob_.Clear(); + Guard_.SetSize(Blob_.Capacity()); +} + +//////////////////////////////////////////////////////////////////////////////// + +TErrorOr<TSharedRef> TryTrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRef reference, + bool keepExistingTracking) +{ + if (!tracker || !reference) { + return reference; + } + return tracker->TryTrack(std::move(reference), keepExistingTracking); +} + +TSharedRef TrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRef reference, + bool keepExistingTracking) +{ + if (!tracker || !reference) { + return reference; + } + return tracker->Track(std::move(reference), keepExistingTracking); +} + +TSharedRefArray TrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRefArray array, + bool keepExistingTracking) +{ + if (!tracker || !array) { + return array; + } + TSharedRefArrayBuilder builder(array.Size()); + for (const auto& part : array) { + builder.Add(tracker->Track(part, keepExistingTracking)); + } + return builder.Finish(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + diff --git a/library/cpp/yt/memory/memory_usage_tracker.h b/library/cpp/yt/memory/memory_usage_tracker.h new file mode 100644 index 00000000000..524f05a34a1 --- /dev/null +++ b/library/cpp/yt/memory/memory_usage_tracker.h @@ -0,0 +1,180 @@ +#pragma once + +#include "blob.h" +#include "ref.h" + +#include <library/cpp/yt/error/error.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct IMemoryUsageTracker + : public TRefCounted +{ + virtual TError TryAcquire(i64 size) = 0; + virtual TError TryChange(i64 size) = 0; + //! Returns true unless overcommit occurred. + virtual bool Acquire(i64 size) = 0; + virtual void Release(i64 size) = 0; + virtual void SetLimit(i64 size) = 0; + virtual i64 GetLimit() const = 0; + virtual i64 GetUsed() const = 0; + virtual i64 GetFree() const = 0; + virtual bool IsExceeded() const = 0; + + //! Tracks memory used by references. + /*! + * Memory tracking is implemented by specific shared ref holders. + * #Track returns reference with a holder that wraps the old one and also + * enables accounting memory in memory tracker's internal state. + + * Subsequent #Track calls for this reference drop memory reference tracker's + * holder unless #keepExistingTracking is true. + */ + virtual TSharedRef Track( + TSharedRef reference, + bool keepHolder = false) = 0; + virtual TErrorOr<TSharedRef> TryTrack( + TSharedRef reference, + bool keepHolder) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IMemoryUsageTracker) + +//////////////////////////////////////////////////////////////////////////////// + +struct IReservingMemoryUsageTracker + : public IMemoryUsageTracker +{ + virtual void ReleaseUnusedReservation() = 0; + virtual TError TryReserve(i64 size) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IReservingMemoryUsageTracker) + +//////////////////////////////////////////////////////////////////////////////// + +IMemoryUsageTrackerPtr GetNullMemoryUsageTracker(); + +//////////////////////////////////////////////////////////////////////////////// + +class TMemoryUsageTrackerGuard + : private TNonCopyable +{ +public: + TMemoryUsageTrackerGuard() = default; + TMemoryUsageTrackerGuard(const TMemoryUsageTrackerGuard& other) = delete; + TMemoryUsageTrackerGuard(TMemoryUsageTrackerGuard&& other); + ~TMemoryUsageTrackerGuard(); + + TMemoryUsageTrackerGuard& operator=(const TMemoryUsageTrackerGuard& other) = delete; + TMemoryUsageTrackerGuard& operator=(TMemoryUsageTrackerGuard&& other); + + static TMemoryUsageTrackerGuard Build( + IMemoryUsageTrackerPtr tracker, + i64 granularity = 1); + static TMemoryUsageTrackerGuard Acquire( + IMemoryUsageTrackerPtr tracker, + i64 size, + i64 granularity = 1); + static TErrorOr<TMemoryUsageTrackerGuard> TryAcquire( + IMemoryUsageTrackerPtr tracker, + i64 size, + i64 granularity = 1); + + void Release(); + + //! Releases the guard but does not return memory to the tracker. + //! The caller should care about releasing memory itself. + void ReleaseNoReclaim(); + + explicit operator bool() const; + + i64 GetSize() const; + void SetSize(i64 size); + TError TrySetSize(i64 size); + void IncreaseSize(i64 sizeDelta); + void DecreaseSize(i64 sizeDelta); + TMemoryUsageTrackerGuard TransferMemory(i64 size); + +private: + IMemoryUsageTrackerPtr Tracker_; + i64 Size_ = 0; + i64 AcquiredSize_ = 0; + i64 Granularity_ = 0; + + void MoveFrom(TMemoryUsageTrackerGuard&& other); + TError SetSizeImpl(i64 size, auto acquirer); +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TMemoryTrackedBlob +{ +public: + static TMemoryTrackedBlob Build( + IMemoryUsageTrackerPtr tracker, + TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultBlobTag>()); + + TMemoryTrackedBlob() = default; + TMemoryTrackedBlob(const TMemoryTrackedBlob& other) = delete; + TMemoryTrackedBlob(TMemoryTrackedBlob&& other) = default; + ~TMemoryTrackedBlob() = default; + + TMemoryTrackedBlob& operator=(const TMemoryTrackedBlob& other) = delete; + TMemoryTrackedBlob& operator=(TMemoryTrackedBlob&& other) = default; + + void Resize( + i64 size, + bool initializeStorage = true); + + TError TryResize( + i64 size, + bool initializeStorage = true); + + void Reserve(i64 capacity); + + TError TryReserve(i64 capacity); + + char* Begin(); + + char* End(); + + size_t Capacity() const; + + size_t Size() const; + + void Append(TRef ref); + + void Clear(); + +private: + TBlob Blob_; + TMemoryUsageTrackerGuard Guard_; + + TMemoryTrackedBlob( + TBlob&& blob, + TMemoryUsageTrackerGuard&& guard); +}; + +//////////////////////////////////////////////////////////////////////////////// + +TErrorOr<TSharedRef> TryTrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRef reference, + bool keepExistingTracking = false); + +TSharedRef TrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRef reference, + bool keepExistingTracking = false); + +TSharedRefArray TrackMemory( + const IMemoryUsageTrackerPtr& tracker, + TSharedRefArray array, + bool keepExistingTracking = false); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h index e348c23439c..b931f980efb 100644 --- a/library/cpp/yt/memory/public.h +++ b/library/cpp/yt/memory/public.h @@ -12,6 +12,8 @@ constexpr size_t CacheLineSize = 64; class TChunkedMemoryPool; DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) +DECLARE_REFCOUNTED_STRUCT(IMemoryUsageTracker) +DECLARE_REFCOUNTED_STRUCT(IReservingMemoryUsageTracker) DECLARE_REFCOUNTED_STRUCT(TSharedRangeHolder) using TMemoryTag = ui32; diff --git a/library/cpp/yt/memory/ya.make b/library/cpp/yt/memory/ya.make index d0c377cf3d1..18ce62844c2 100644 --- a/library/cpp/yt/memory/ya.make +++ b/library/cpp/yt/memory/ya.make @@ -15,6 +15,7 @@ SRCS( chunked_memory_pool_output.cpp chunked_output_stream.cpp memory_tag.cpp + memory_usage_tracker.cpp new.cpp poison.cpp ref.cpp |
