diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-11-13 15:50:58 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-13 15:50:58 +0300 |
commit | 7d51c2b807c57d8ed232ea4305dba859d9bc0686 (patch) | |
tree | 3b7a8346e8e5a128d5fde3749bb0fb3812666281 | |
parent | f847a52da27bb02ef0d5616758accbba961c7902 (diff) | |
download | ydb-7d51c2b807c57d8ed232ea4305dba859d9bc0686.tar.gz |
native memory control (#11559)
13 files changed, 76 insertions, 16 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index fdf337e51c4..9b02a762aca 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -634,6 +634,7 @@ message TLimiterConfig { message TGroupedMemoryLimiterConfig { optional bool Enabled = 1 [default = true]; optional uint64 MemoryLimit = 2; + optional uint64 HardMemoryLimit = 3; } message TExternalIndexConfig { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index e3ec8567b86..e885d4461dc 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -52,6 +52,7 @@ private: const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; + TAtomic AbortFlag = 0; public: template <class T> @@ -61,6 +62,13 @@ public: return result; } + void AbortWithError(const TString& errorMessage) { + if (AtomicCas(&AbortFlag, 1, 0)) { + NActors::TActivationContext::Send( + ScanActorId, std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail(errorMessage))); + } + } + bool IsReverse() const { return ReadMetadata->IsDescSorted(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index ab231db780d..f09e0e5b9a0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -189,6 +189,14 @@ TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation( , TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) { } +void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) { + auto sourcePtr = Source.lock(); + if (sourcePtr) { + sourcePtr->GetContext()->GetCommonContext()->AbortWithError( + "cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'"); + } +} + TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace( const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h index 1fc9f8bce54..f9e5774c4ab 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.h @@ -237,7 +237,7 @@ protected: NColumnShard::TCounterGuard TasksGuard; virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override; - + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; public: TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step); }; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h index bbe2d11ccb3..074a1c42f96 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h @@ -77,6 +77,9 @@ private: virtual bool DoApply(IDataReader& indexedDataRead) const override; virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override; + virtual void DoOnAllocationImpossible(const TString& errorMessage) override { + Context->GetCommonContext()->AbortWithError("cannot allocate memory for merge task: '" + errorMessage + "'"); + } public: TBaseMergeTask(const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext) diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index b13dbcf950d..55811c2b65d 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -195,7 +195,12 @@ TStatsIterator::TFetchingAccessorAllocation::TFetchingAccessorAllocation( , AccessorsManager(context->GetDataAccessorsManager()) , Request(request) , WaitingCountersGuard(context->GetCounters().GetFetcherAcessorsGuard()) - , OwnerId(context->GetScanActorId()) { + , OwnerId(context->GetScanActorId()) + , Context(context) { +} + +void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const TString& errorMessage) { + Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage); } } // namespace NKikimr::NOlap::NReader::NSysView::NChunks diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 1022f8b3f76..b932e86533f 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -109,12 +109,15 @@ private: std::shared_ptr<TDataAccessorsRequest> Request; NColumnShard::TCounterGuard WaitingCountersGuard; const NActors::TActorId OwnerId; + const std::shared_ptr<NReader::TReadContext> Context; + virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*selfPtr*/) override { Guard = std::move(guard); AccessorsManager->AskData(std::move(Request)); return true; } + virtual void DoOnAllocationImpossible(const TString& errorMessage) override; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { if (result.HasErrors()) { diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h index dcbf2971367..e2ca06bd286 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h @@ -48,12 +48,16 @@ public: AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName()); AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())( "allocation_internal_group_id", AllocationInternalGroupId); + auto allocationResult = Stage->Allocate(AllocatedVolume); + if (allocationResult.IsFail()) { + AllocationFailed = true; + Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage()); + return false; + } const bool result = Allocation->OnAllocated( std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); - if (result) { - Stage->Allocate(AllocatedVolume); - } else { - Stage->Free(AllocatedVolume, false); + if (!result) { + Stage->Free(AllocatedVolume, true); AllocationFailed = true; } Allocation = nullptr; diff --git a/ydb/core/tx/limiter/grouped_memory/service/counters.h b/ydb/core/tx/limiter/grouped_memory/service/counters.h index 3c96b3b8b9a..1d55b7b17f4 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/counters.h +++ b/ydb/core/tx/limiter/grouped_memory/service/counters.h @@ -10,6 +10,7 @@ private: NMonitoring::TDynamicCounters::TCounterPtr AllocatedChunks; NMonitoring::TDynamicCounters::TCounterPtr WaitingBytes; NMonitoring::TDynamicCounters::TCounterPtr WaitingChunks; + NMonitoring::TDynamicCounters::TCounterPtr AllocationFailCount; public: TStageCounters(const TCommonCountersOwner& owner, const TString& name) @@ -17,7 +18,12 @@ public: , AllocatedBytes(TBase::GetValue("Allocated/Bytes")) , AllocatedChunks(TBase::GetValue("Allocated/Count")) , WaitingBytes(TBase::GetValue("Waiting/Bytes")) - , WaitingChunks(TBase::GetValue("Waiting/Count")) { + , WaitingChunks(TBase::GetValue("Waiting/Count")) + , AllocationFailCount(TBase::GetValue("AllocationFails/Count")) { + } + + void OnCannotAllocate() { + AllocationFailCount->Add(1); } void Add(const ui64 volume, const bool allocated) { diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index d92120f46fb..b0b1b11dce8 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -5,6 +5,7 @@ #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/actorid.h> #include <ydb/library/actors/core/log.h> +#include <ydb/library/conclusion/status.h> namespace NKikimr::NOlap::NGroupedMemoryManager { @@ -95,6 +96,7 @@ class TStageFeatures { private: YDB_READONLY_DEF(TString, Name); YDB_READONLY(ui64, Limit, 0); + YDB_READONLY(ui64, HardLimit, 0); YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage); YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting); std::shared_ptr<TStageFeatures> Owner; @@ -114,15 +116,20 @@ public: return Usage.Val() + Waiting.Val(); } - TStageFeatures( - const TString& name, const ui64 limit, const std::shared_ptr<TStageFeatures>& owner, const std::shared_ptr<TStageCounters>& counters) + TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner, + const std::shared_ptr<TStageCounters>& counters) : Name(name) , Limit(limit) + , HardLimit(hardLimit) , Owner(owner) , Counters(counters) { } - void Allocate(const ui64 volume) { + [[nodiscard]] TConclusionStatus Allocate(const ui64 volume) { + if (HardLimit < Usage.Val() + volume) { + Counters->OnCannotAllocate(); + return TConclusionStatus::Fail(TStringBuilder() << "limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ";"); + } Waiting.Sub(volume); Usage.Add(volume); if (Counters) { @@ -130,8 +137,13 @@ public: Counters->Sub(volume, false); } if (Owner) { - Owner->Allocate(volume); + const auto ownerResult = Owner->Allocate(volume); + if (ownerResult.IsFail()) { + Free(volume, true); + return ownerResult; + } } + return TConclusionStatus::Success(); } void Free(const ui64 volume, const bool allocated) { @@ -199,6 +211,7 @@ private: YDB_READONLY(ui64, Identifier, Counter.Inc()); YDB_READONLY(ui64, Memory, 0); bool Allocated = false; + virtual void DoOnAllocationImpossible(const TString& errorMessage) = 0; virtual bool DoOnAllocated( std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) = 0; @@ -216,6 +229,10 @@ public: return Allocated; } + void OnAllocationImpossible(const TString& errorMessage) { + DoOnAllocationImpossible(errorMessage); + } + [[nodiscard]] bool OnAllocated( std::shared_ptr<TAllocationGuard>&& guard, const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation); }; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp index 17fe5597574..ee01e1ef3f6 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/config.cpp +++ b/ydb/core/tx/limiter/grouped_memory/usage/config.cpp @@ -7,13 +7,16 @@ bool TConfig::DeserializeFromProto(const NKikimrConfig::TGroupedMemoryLimiterCon if (config.HasMemoryLimit()) { MemoryLimit = config.GetMemoryLimit(); } + if (config.HasHardMemoryLimit()) { + HardMemoryLimit = config.GetHardMemoryLimit(); + } Enabled = config.GetEnabled(); return true; } TString TConfig::DebugString() const { TStringBuilder sb; - sb << "MemoryLimit=" << MemoryLimit << ";Enabled=" << Enabled << ";"; + sb << "MemoryLimit=" << MemoryLimit << ";HardMemoryLimit=" << HardMemoryLimit << ";Enabled=" << Enabled << ";"; return sb; } diff --git a/ydb/core/tx/limiter/grouped_memory/usage/config.h b/ydb/core/tx/limiter/grouped_memory/usage/config.h index 91a9b5bc7af..c3a69680a19 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/config.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/config.h @@ -8,6 +8,7 @@ class TConfig { private: YDB_READONLY(bool, Enabled, true); YDB_READONLY(ui64, MemoryLimit, ui64(3) << 30); + YDB_READONLY(ui64, HardMemoryLimit, ui64(10) << 30); public: diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 8192743218b..b662494d7b0 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -15,13 +15,14 @@ class TServiceOperatorImpl { private: TConfig ServiceConfig = TConfig::BuildDisabledConfig(); std::shared_ptr<TCounters> Counters; - std::shared_ptr<TStageFeatures> DefaultStageFeatures = std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, nullptr, nullptr); + std::shared_ptr<TStageFeatures> DefaultStageFeatures = + std::make_shared<TStageFeatures>("DEFAULT", ((ui64)3) << 30, ((ui64)10) << 30, nullptr, nullptr); using TSelf = TServiceOperatorImpl<TMemoryLimiterPolicy>; static void Register(const TConfig& serviceConfig, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { Singleton<TSelf>()->Counters = std::make_shared<TCounters>(counters, TMemoryLimiterPolicy::Name); Singleton<TSelf>()->ServiceConfig = serviceConfig; - Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>( - "GLOBAL", serviceConfig.GetMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general")); + Singleton<TSelf>()->DefaultStageFeatures = std::make_shared<TStageFeatures>("GLOBAL", serviceConfig.GetMemoryLimit(), + serviceConfig.GetHardMemoryLimit(), nullptr, Singleton<TSelf>()->Counters->BuildStageCounters("general")); } static const TString& GetMemoryLimiterName() { Y_ABORT_UNLESS(TMemoryLimiterPolicy::Name.size() == 4); @@ -35,7 +36,7 @@ public: } else { AFL_VERIFY(Singleton<TSelf>()->DefaultStageFeatures); return std::make_shared<TStageFeatures>( - name, limit, Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name)); + name, limit, Max<ui64>(), Singleton<TSelf>()->DefaultStageFeatures, Singleton<TSelf>()->Counters->BuildStageCounters(name)); } } |