aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-11-13 15:50:58 +0300
committerGitHub <noreply@github.com>2024-11-13 15:50:58 +0300
commit7d51c2b807c57d8ed232ea4305dba859d9bc0686 (patch)
tree3b7a8346e8e5a128d5fde3749bb0fb3812666281
parentf847a52da27bb02ef0d5616758accbba961c7902 (diff)
downloadydb-7d51c2b807c57d8ed232ea4305dba859d9bc0686.tar.gz
native memory control (#11559)
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_context.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h3
-rw-r--r--ydb/core/tx/limiter/grouped_memory/service/allocation.h12
-rw-r--r--ydb/core/tx/limiter/grouped_memory/service/counters.h8
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/abstract.h25
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/config.cpp5
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/config.h1
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/service.h9
13 files changed, 76 insertions, 16 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index fdf337e51c4..9b02a762aca 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -634,6 +634,7 @@ message TLimiterConfig {
message TGroupedMemoryLimiterConfig {
optional bool Enabled = 1 [default = true];
optional uint64 MemoryLimit = 2;
+ optional uint64 HardMemoryLimit = 3;
}
message TExternalIndexConfig {
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
index e3ec8567b86..e885d4461dc 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
@@ -52,6 +52,7 @@ private:
const TActorId ResourceSubscribeActorId;
const TActorId ReadCoordinatorActorId;
const TComputeShardingPolicy ComputeShardingPolicy;
+ TAtomic AbortFlag = 0;
public:
template <class T>
@@ -61,6 +62,13 @@ public:
return result;
}
+ void AbortWithError(const TString& errorMessage) {
+ if (AtomicCas(&AbortFlag, 1, 0)) {
+ NActors::TActivationContext::Send(
+ ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage)));
+ }
+ }
+
bool IsReverse() const {
return ReadMetadata->IsDescSorted();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
index ab231db780d..f09e0e5b9a0 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp
@@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) {
}
+void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
+ auto sourcePtr = Source.lock();
+ if (sourcePtr) {
+ sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
+ "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
+ }
+}
+
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
index 1fc9f8bce54..f9e5774c4ab 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h
@@ -237,7 +237,7 @@ protected:
NColumnShard::TCounterGuard TasksGuard;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
-
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
public:
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step);
};
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h
index bbe2d11ccb3..074a1c42f96 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h
@@ -77,6 +77,9 @@ private:
virtual bool DoApply(IDataReader& indexedDataRead) const override;
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) override {
+ Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'");
+ }
public:
TBaseMergeTask(const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext)
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp
index b13dbcf950d..55811c2b65d 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp
@@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation(
, AccessorsManager(context->GetDataAccessorsManager())
, Request(request)
, WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard())
- , OwnerId(context->GetScanActorId()) {
+ , OwnerId(context->GetScanActorId())
+ , Context(context) {
+}
+
+void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
+ Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage);
}
} // namespace NKikimr::NOlap::NReader::NSysView::NChunks
diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
index 1022f8b3f76..b932e86533f 100644
--- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
+++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h
@@ -109,12 +109,15 @@ private:
std::shared_ptr<TDataAccessorsRequest> Request;
NColumnShard::TCounterGuard WaitingCountersGuard;
const NActors::TActorId OwnerId;
+ const std::shared_ptr<NReader::TReadContext> Context;
+
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*selfPtr*/) override {
Guard = std::move(guard);
AccessorsManager->AskData(std::move(Request));
return true;
}
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
if (result.HasErrors()) {
diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h
index dcbf2971367..e2ca06bd286 100644
--- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h
+++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h
@@ -48,12 +48,16 @@ public:
AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
"allocation_internal_group_id", AllocationInternalGroupId);
+ auto allocationResult = Stage->Allocate(AllocatedVolume);
+ if (allocationResult.IsFail()) {
+ AllocationFailed = true;
+ Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
+ return false;
+ }
const bool result = Allocation->OnAllocated(
std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
- if (result) {
- Stage->Allocate(AllocatedVolume);
- } else {
- Stage->Free(AllocatedVolume, false);
+ if (!result) {
+ Stage->Free(AllocatedVolume, true);
AllocationFailed = true;
}
Allocation = nullptr;
diff --git a/ydb/core/tx/limiter/grouped_memory/service/counters.h b/ydb/core/tx/limiter/grouped_memory/service/counters.h
index 3c96b3b8b9a..1d55b7b17f4 100644
--- a/ydb/core/tx/limiter/grouped_memory/service/counters.h
+++ b/ydb/core/tx/limiter/grouped_memory/service/counters.h
@@ -10,6 +10,7 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunks;
NMonitoring::TDynamicCounters::TCounterPtr WaitingBytes;
NMonitoring::TDynamicCounters::TCounterPtr WaitingChunks;
+ NMonitoring::TDynamicCounters::TCounterPtr AllocationFailCount;
public:
TStageCounters(const TCommonCountersOwner& owner, const TString& name)
@@ -17,7 +18,12 @@ public:
, AllocatedBytes(TBase::GetValue("Allocated/Bytes"))
, AllocatedChunks(TBase::GetValue("Allocated/Count"))
, WaitingBytes(TBase::GetValue("Waiting/Bytes"))
- , WaitingChunks(TBase::GetValue("Waiting/Count")) {
+ , WaitingChunks(TBase::GetValue("Waiting/Count"))
+ , AllocationFailCount(TBase::GetValue("AllocationFails/Count")) {
+ }
+
+ void OnCannotAllocate() {
+ AllocationFailCount->Add(1);
}
void Add(const ui64 volume, const bool allocated) {
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
index d92120f46fb..b0b1b11dce8 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
@@ -5,6 +5,7 @@
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/log.h>
+#include <ydb/library/conclusion/status.h>
namespace NKikimr::NOlap::NGroupedMemoryManager {
@@ -95,6 +96,7 @@ class TStageFeatures {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY(ui64, Limit, 0);
+ YDB_READONLY(ui64, HardLimit, 0);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
std::shared_ptr<TStageFeatures> Owner;
@@ -114,15 +116,20 @@ public:
return Usage.Val() + Waiting.Val();
}
- TStageFeatures(
- const TString& name, const ui64 limit, const std::shared_ptr<TStageFeatures>& owner, const std::shared_ptr<TStageCounters>& counters)
+ TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
+ const std::shared_ptr<TStageCounters>& counters)
: Name(name)
, Limit(limit)
+ , HardLimit(hardLimit)
, Owner(owner)
, Counters(counters) {
}
- void Allocate(const ui64 volume) {
+ [[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
+ if (HardLimit < Usage.Val() + volume) {
+ Counters->OnCannotAllocate();
+ return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";");
+ }
Waiting.Sub(volume);
Usage.Add(volume);
if (Counters) {
@@ -130,8 +137,13 @@ public:
Counters->Sub(volume, false);
}
if (Owner) {
- Owner->Allocate(volume);
+ const auto ownerResult = Owner->Allocate(volume);
+ if (ownerResult.IsFail()) {
+ Free(volume, true);
+ return ownerResult;
+ }
}
+ return TConclusionStatus::Success();
}
void Free(const ui64 volume, const bool allocated) {
@@ -199,6 +211,7 @@ private:
YDB_READONLY(ui64, Identifier, Counter.Inc());
YDB_READONLY(ui64, Memory, 0);
bool Allocated = false;
+ virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0;
virtual bool DoOnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) = 0;
@@ -216,6 +229,10 @@ public:
return Allocated;
}
+ void OnAllocationImpossible(const TString& errorMessage) {
+ DoOnAllocationImpossible(errorMessage);
+ }
+
[[nodiscard]] bool OnAllocated(
std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation);
};
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp
index 17fe5597574..ee01e1ef3f6 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp
+++ b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp
@@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon
if (config.HasMemoryLimit()) {
MemoryLimit = config.GetMemoryLimit();
}
+ if (config.HasHardMemoryLimit()) {
+ HardMemoryLimit = config.GetHardMemoryLimit();
+ }
Enabled = config.GetEnabled();
return true;
}
TString TConfig::DebugString() const {
TStringBuilder sb;
- sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";";
+ sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";";
return sb;
}
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.h b/ydb/core/tx/limiter/grouped_memory/usage/config.h
index 91a9b5bc7af..c3a69680a19 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/config.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/config.h
@@ -8,6 +8,7 @@ class TConfig {
private:
YDB_READONLY(bool, Enabled, true);
YDB_READONLY(ui64, MemoryLimit, ui64(3) << 30);
+ YDB_READONLY(ui64, HardMemoryLimit, ui64(10) << 30);
public:
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h
index 8192743218b..b662494d7b0 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/service.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h
@@ -15,13 +15,14 @@ class TServiceOperatorImpl {
private:
TConfig ServiceConfig = TConfig::BuildDisabledConfig();
std::shared_ptr<TCounters> Counters;
- std::shared_ptr<TStageFeatures> DefaultStageFeatures = std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, nullptr, nullptr);
+ std::shared_ptr<TStageFeatures> DefaultStageFeatures =
+ std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, ((ui64)10) << 30, nullptr, nullptr);
using TSelf = TServiceOperatorImpl<TMemoryLimiterPolicy>;
static void Register(const TConfig& serviceConfig, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
Singleton<TSelf>()->Counters = std::make_shared<TCounters>(counters, TMemoryLimiterPolicy::Name);
Singleton<TSelf>()->ServiceConfig = serviceConfig;
- Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>(
- "GLOBAL", serviceConfig.GetMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
+ Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>("GLOBAL", serviceConfig.GetMemoryLimit(),
+ serviceConfig.GetHardMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general"));
}
static const TString& GetMemoryLimiterName() {
Y_ABORT_UNLESS(TMemoryLimiterPolicy::Name.size() == 4);
@@ -35,7 +36,7 @@ public:
} else {
AFL_VERIFY(Singleton<TSelf>()->DefaultStageFeatures);
return std::make_shared<TStageFeatures>(
- name, limit, Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
+ name, limit, Max<ui64>(), Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name));
}
}