diff options
author | akozhikhov <akozhikhov@yandex-team.com> | 2024-07-18 04:12:31 +0300 |
---|---|---|
committer | akozhikhov <akozhikhov@yandex-team.com> | 2024-07-18 04:21:48 +0300 |
commit | 9a82c7b66f562796be8bb78d80c224b511876ea8 (patch) | |
tree | c7562864f4406655258b83a9e2165134daeb0632 | |
parent | 035857f0e62eccf50321c716520fce5fc6a7b6bf (diff) | |
download | ydb-9a82c7b66f562796be8bb78d80c224b511876ea8.tar.gz |
YT-22094: Table replicator memory management
30ee9b5745a0e15a0bdbe3efa0329361cb52462d
-rw-r--r-- | yt/yt/client/table_client/row_buffer.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/table_client/row_buffer.h | 27 | ||||
-rw-r--r-- | yt/yt/core/misc/memory_usage_tracker.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/misc/memory_usage_tracker.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/bus/channel.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/public.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_ut.cpp | 4 |
8 files changed, 32 insertions, 25 deletions
diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp index 40a2703ea5..b6be52bd18 100644 --- a/yt/yt/client/table_client/row_buffer.cpp +++ b/yt/yt/client/table_client/row_buffer.cpp @@ -8,6 +8,18 @@ namespace NYT::NTableClient { //////////////////////////////////////////////////////////////////////////////// +TRowBuffer::TRowBuffer( + TRefCountedTypeCookie tagCookie, + IMemoryChunkProviderPtr chunkProvider, + size_t startChunkSize, + IMemoryUsageTrackerPtr tracker) + : MemoryTracker_(std::move(tracker)) + , Pool_( + tagCookie, + std::move(chunkProvider), + startChunkSize) +{ } + TChunkedMemoryPool* TRowBuffer::GetPool() { return &Pool_; diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h index 2f9a8d4d22..05ab9635c4 100644 --- a/yt/yt/client/table_client/row_buffer.h +++ b/yt/yt/client/table_client/row_buffer.h @@ -4,10 +4,10 @@ #include "unversioned_row.h" #include "versioned_row.h" -#include <library/cpp/yt/memory/chunked_memory_pool.h> - #include <yt/yt/core/misc/memory_usage_tracker.h> +#include <library/cpp/yt/memory/chunked_memory_pool.h> + namespace NYT::NTableClient { //////////////////////////////////////////////////////////////////////////////// @@ -27,34 +27,28 @@ public: TRefCountedTypeCookie tagCookie, IMemoryChunkProviderPtr chunkProvider, size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, - IMemoryUsageTrackerPtr tracker = nullptr) - : Pool_( - tagCookie, - std::move(chunkProvider), - startChunkSize) - , MemoryTracker_(std::move(tracker)) - { } + IMemoryUsageTrackerPtr tracker = nullptr); template <class TTag = TDefaultRowBufferPoolTag> explicit TRowBuffer( - TTag = TDefaultRowBufferPoolTag(), + TTag /*tag*/ = TDefaultRowBufferPoolTag(), size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize, IMemoryUsageTrackerPtr tracker = nullptr) - : Pool_( + : MemoryTracker_(std::move(tracker)) + , Pool_( TTag(), startChunkSize) - , MemoryTracker_(std::move(tracker)) { } template <class TTag> TRowBuffer( - TTag, + TTag /*tag*/, IMemoryChunkProviderPtr chunkProvider, IMemoryUsageTrackerPtr tracker = nullptr) - : Pool_( + : MemoryTracker_(std::move(tracker)) + , Pool_( GetRefCountedTypeCookie<TTag>(), std::move(chunkProvider)) - , MemoryTracker_(std::move(tracker)) { } TChunkedMemoryPool* GetPool(); @@ -114,8 +108,9 @@ public: void Purge(); private: + const IMemoryUsageTrackerPtr MemoryTracker_; + TChunkedMemoryPool Pool_; - IMemoryUsageTrackerPtr MemoryTracker_; std::optional<TMemoryUsageTrackerGuard> MemoryGuard_; void ValidateNoOverflow(); diff --git a/yt/yt/core/misc/memory_usage_tracker.cpp b/yt/yt/core/misc/memory_usage_tracker.cpp index 0876e010d8..e81a120a6f 100644 --- a/yt/yt/core/misc/memory_usage_tracker.cpp +++ b/yt/yt/core/misc/memory_usage_tracker.cpp @@ -196,7 +196,7 @@ i64 TMemoryUsageTrackerGuard::GetSize() const void TMemoryUsageTrackerGuard::SetSize(i64 size) { - auto ignoredError = SetSizeGeneric(size, [&] (i64 delta) { + auto ignoredError = SetSizeImpl(size, [&] (i64 delta) { Tracker_->Acquire(delta); return TError{}; }); @@ -206,12 +206,12 @@ void TMemoryUsageTrackerGuard::SetSize(i64 size) TError TMemoryUsageTrackerGuard::TrySetSize(i64 size) { - return SetSizeGeneric(size, [&] (i64 delta) { + return SetSizeImpl(size, [&] (i64 delta) { return Tracker_->TryAcquire(delta); }); } -TError TMemoryUsageTrackerGuard::SetSizeGeneric(i64 size, auto acquirer) +TError TMemoryUsageTrackerGuard::SetSizeImpl(i64 size, auto acquirer) { if (!Tracker_) { return {}; diff --git a/yt/yt/core/misc/memory_usage_tracker.h b/yt/yt/core/misc/memory_usage_tracker.h index 54b078697a..00d93cb30f 100644 --- a/yt/yt/core/misc/memory_usage_tracker.h +++ b/yt/yt/core/misc/memory_usage_tracker.h @@ -93,7 +93,7 @@ private: i64 Granularity_ = 0; void MoveFrom(TMemoryUsageTrackerGuard&& other); - TError SetSizeGeneric(i64 size, auto acquirer); + TError SetSizeImpl(i64 size, auto acquirer); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index 70809661c0..e62e8d3404 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -923,7 +923,7 @@ private: message = TrackMemory(MemoryUsageTracker_, std::move(message)); if (MemoryUsageTracker_->IsExceeded()) { auto error = TError( - NRpc::EErrorCode::MemoryOverflow, + NRpc::EErrorCode::MemoryPressure, "Response is dropped due to high memory pressure"); requestControl->ProfileError(error); NotifyError( diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index c44c3d501c..db2adda4b1 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -183,7 +183,7 @@ YT_DEFINE_ERROR_ENUM( ((Overloaded) (118)) // The server is currently overloaded and unable to handle additional requests. // The client should try to reduce their request rate until the server has had a chance to recover. ((SslError) (static_cast<int>(NBus::EErrorCode::SslError))) - ((MemoryOverflow) (120)) + ((MemoryPressure) (120)) ((GlobalDiscoveryError) (121)) // Single peer discovery interrupts discovery session. ); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index f85ca2ea6a..b1db24e718 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -1717,7 +1717,7 @@ void TServiceBase::HandleRequest( message = TrackMemory(MemoryUsageTracker_, std::move(message)); if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) { return replyError(TError( - NRpc::EErrorCode::MemoryOverflow, + NRpc::EErrorCode::MemoryPressure, "Request is dropped due to high memory pressure")); } diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index e63a471b13..310cb73f67 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -813,7 +813,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK()); } -TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException) +TYPED_TEST(TNotGrpcTest, RequesMemoryPressureException) { auto memoryUsageTracker = this->GetMemoryUsageTracker(); memoryUsageTracker->ClearTotalUsage(); @@ -828,7 +828,7 @@ TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException) auto result = WaitFor(req->Invoke().AsVoid()); // Limit of memory is 32 MB. - EXPECT_EQ(NRpc::EErrorCode::MemoryOverflow, req->Invoke().Get().GetCode()); + EXPECT_EQ(NRpc::EErrorCode::MemoryPressure, req->Invoke().Get().GetCode()); } TYPED_TEST(TNotGrpcTest, MemoryTracking) |