aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorakozhikhov <akozhikhov@yandex-team.com>2024-07-18 04:12:31 +0300
committerakozhikhov <akozhikhov@yandex-team.com>2024-07-18 04:21:48 +0300
commit9a82c7b66f562796be8bb78d80c224b511876ea8 (patch)
treec7562864f4406655258b83a9e2165134daeb0632
parent035857f0e62eccf50321c716520fce5fc6a7b6bf (diff)
downloadydb-9a82c7b66f562796be8bb78d80c224b511876ea8.tar.gz
YT-22094: Table replicator memory management
30ee9b5745a0e15a0bdbe3efa0329361cb52462d
-rw-r--r--yt/yt/client/table_client/row_buffer.cpp12
-rw-r--r--yt/yt/client/table_client/row_buffer.h27
-rw-r--r--yt/yt/core/misc/memory_usage_tracker.cpp6
-rw-r--r--yt/yt/core/misc/memory_usage_tracker.h2
-rw-r--r--yt/yt/core/rpc/bus/channel.cpp2
-rw-r--r--yt/yt/core/rpc/public.h2
-rw-r--r--yt/yt/core/rpc/service_detail.cpp2
-rw-r--r--yt/yt/core/rpc/unittests/rpc_ut.cpp4
8 files changed, 32 insertions, 25 deletions
diff --git a/yt/yt/client/table_client/row_buffer.cpp b/yt/yt/client/table_client/row_buffer.cpp
index 40a2703ea5..b6be52bd18 100644
--- a/yt/yt/client/table_client/row_buffer.cpp
+++ b/yt/yt/client/table_client/row_buffer.cpp
@@ -8,6 +8,18 @@ namespace NYT::NTableClient {
////////////////////////////////////////////////////////////////////////////////
+TRowBuffer::TRowBuffer(
+ TRefCountedTypeCookie tagCookie,
+ IMemoryChunkProviderPtr chunkProvider,
+ size_t startChunkSize,
+ IMemoryUsageTrackerPtr tracker)
+ : MemoryTracker_(std::move(tracker))
+ , Pool_(
+ tagCookie,
+ std::move(chunkProvider),
+ startChunkSize)
+{ }
+
TChunkedMemoryPool* TRowBuffer::GetPool()
{
return &Pool_;
diff --git a/yt/yt/client/table_client/row_buffer.h b/yt/yt/client/table_client/row_buffer.h
index 2f9a8d4d22..05ab9635c4 100644
--- a/yt/yt/client/table_client/row_buffer.h
+++ b/yt/yt/client/table_client/row_buffer.h
@@ -4,10 +4,10 @@
#include "unversioned_row.h"
#include "versioned_row.h"
-#include <library/cpp/yt/memory/chunked_memory_pool.h>
-
#include <yt/yt/core/misc/memory_usage_tracker.h>
+#include <library/cpp/yt/memory/chunked_memory_pool.h>
+
namespace NYT::NTableClient {
////////////////////////////////////////////////////////////////////////////////
@@ -27,34 +27,28 @@ public:
TRefCountedTypeCookie tagCookie,
IMemoryChunkProviderPtr chunkProvider,
size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
- IMemoryUsageTrackerPtr tracker = nullptr)
- : Pool_(
- tagCookie,
- std::move(chunkProvider),
- startChunkSize)
- , MemoryTracker_(std::move(tracker))
- { }
+ IMemoryUsageTrackerPtr tracker = nullptr);
template <class TTag = TDefaultRowBufferPoolTag>
explicit TRowBuffer(
- TTag = TDefaultRowBufferPoolTag(),
+ TTag /*tag*/ = TDefaultRowBufferPoolTag(),
size_t startChunkSize = TChunkedMemoryPool::DefaultStartChunkSize,
IMemoryUsageTrackerPtr tracker = nullptr)
- : Pool_(
+ : MemoryTracker_(std::move(tracker))
+ , Pool_(
TTag(),
startChunkSize)
- , MemoryTracker_(std::move(tracker))
{ }
template <class TTag>
TRowBuffer(
- TTag,
+ TTag /*tag*/,
IMemoryChunkProviderPtr chunkProvider,
IMemoryUsageTrackerPtr tracker = nullptr)
- : Pool_(
+ : MemoryTracker_(std::move(tracker))
+ , Pool_(
GetRefCountedTypeCookie<TTag>(),
std::move(chunkProvider))
- , MemoryTracker_(std::move(tracker))
{ }
TChunkedMemoryPool* GetPool();
@@ -114,8 +108,9 @@ public:
void Purge();
private:
+ const IMemoryUsageTrackerPtr MemoryTracker_;
+
TChunkedMemoryPool Pool_;
- IMemoryUsageTrackerPtr MemoryTracker_;
std::optional<TMemoryUsageTrackerGuard> MemoryGuard_;
void ValidateNoOverflow();
diff --git a/yt/yt/core/misc/memory_usage_tracker.cpp b/yt/yt/core/misc/memory_usage_tracker.cpp
index 0876e010d8..e81a120a6f 100644
--- a/yt/yt/core/misc/memory_usage_tracker.cpp
+++ b/yt/yt/core/misc/memory_usage_tracker.cpp
@@ -196,7 +196,7 @@ i64 TMemoryUsageTrackerGuard::GetSize() const
void TMemoryUsageTrackerGuard::SetSize(i64 size)
{
- auto ignoredError = SetSizeGeneric(size, [&] (i64 delta) {
+ auto ignoredError = SetSizeImpl(size, [&] (i64 delta) {
Tracker_->Acquire(delta);
return TError{};
});
@@ -206,12 +206,12 @@ void TMemoryUsageTrackerGuard::SetSize(i64 size)
TError TMemoryUsageTrackerGuard::TrySetSize(i64 size)
{
- return SetSizeGeneric(size, [&] (i64 delta) {
+ return SetSizeImpl(size, [&] (i64 delta) {
return Tracker_->TryAcquire(delta);
});
}
-TError TMemoryUsageTrackerGuard::SetSizeGeneric(i64 size, auto acquirer)
+TError TMemoryUsageTrackerGuard::SetSizeImpl(i64 size, auto acquirer)
{
if (!Tracker_) {
return {};
diff --git a/yt/yt/core/misc/memory_usage_tracker.h b/yt/yt/core/misc/memory_usage_tracker.h
index 54b078697a..00d93cb30f 100644
--- a/yt/yt/core/misc/memory_usage_tracker.h
+++ b/yt/yt/core/misc/memory_usage_tracker.h
@@ -93,7 +93,7 @@ private:
i64 Granularity_ = 0;
void MoveFrom(TMemoryUsageTrackerGuard&& other);
- TError SetSizeGeneric(i64 size, auto acquirer);
+ TError SetSizeImpl(i64 size, auto acquirer);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp
index 70809661c0..e62e8d3404 100644
--- a/yt/yt/core/rpc/bus/channel.cpp
+++ b/yt/yt/core/rpc/bus/channel.cpp
@@ -923,7 +923,7 @@ private:
message = TrackMemory(MemoryUsageTracker_, std::move(message));
if (MemoryUsageTracker_->IsExceeded()) {
auto error = TError(
- NRpc::EErrorCode::MemoryOverflow,
+ NRpc::EErrorCode::MemoryPressure,
"Response is dropped due to high memory pressure");
requestControl->ProfileError(error);
NotifyError(
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index c44c3d501c..db2adda4b1 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -183,7 +183,7 @@ YT_DEFINE_ERROR_ENUM(
((Overloaded) (118)) // The server is currently overloaded and unable to handle additional requests.
// The client should try to reduce their request rate until the server has had a chance to recover.
((SslError) (static_cast<int>(NBus::EErrorCode::SslError)))
- ((MemoryOverflow) (120))
+ ((MemoryPressure) (120))
((GlobalDiscoveryError) (121)) // Single peer discovery interrupts discovery session.
);
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index f85ca2ea6a..b1db24e718 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -1717,7 +1717,7 @@ void TServiceBase::HandleRequest(
message = TrackMemory(MemoryUsageTracker_, std::move(message));
if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded()) {
return replyError(TError(
- NRpc::EErrorCode::MemoryOverflow,
+ NRpc::EErrorCode::MemoryPressure,
"Request is dropped due to high memory pressure"));
}
diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp
index e63a471b13..310cb73f67 100644
--- a/yt/yt/core/rpc/unittests/rpc_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp
@@ -813,7 +813,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK());
}
-TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException)
+TYPED_TEST(TNotGrpcTest, RequesMemoryPressureException)
{
auto memoryUsageTracker = this->GetMemoryUsageTracker();
memoryUsageTracker->ClearTotalUsage();
@@ -828,7 +828,7 @@ TYPED_TEST(TNotGrpcTest, RequestMemoryOverflowException)
auto result = WaitFor(req->Invoke().AsVoid());
// Limit of memory is 32 MB.
- EXPECT_EQ(NRpc::EErrorCode::MemoryOverflow, req->Invoke().Get().GetCode());
+ EXPECT_EQ(NRpc::EErrorCode::MemoryPressure, req->Invoke().Get().GetCode());
}
TYPED_TEST(TNotGrpcTest, MemoryTracking)