diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-01-25 13:42:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-25 13:42:10 +0300 |
commit | 98a968de4b9adb6c220244aca144824564a11093 (patch) | |
tree | d8eea6ae86b42e931de0fe5673fb09a81fd1cfaf | |
parent | f1de4af13762632983a24cd9a7ac226d6c6704e3 (diff) | |
download | ydb-98a968de4b9adb6c220244aca144824564a11093.tar.gz |
fix race for positive control counters usage (#13811)
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/service/allocation.cpp | 39 | ||||
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/service/allocation.h | 39 | ||||
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/usage/abstract.h | 124 | ||||
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp | 130 | ||||
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/usage/stage_features.h | 37 | ||||
-rw-r--r-- | ydb/core/tx/limiter/grouped_memory/usage/ya.make | 1 | ||||
-rw-r--r-- | ydb/library/accessor/positive_integer.cpp | 29 | ||||
-rw-r--r-- | ydb/library/accessor/positive_integer.h | 11 |
8 files changed, 231 insertions, 179 deletions
diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp index 1dfe27953eb..768b0925dd1 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp @@ -23,4 +23,43 @@ TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 scopeId, const Stage->Add(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated); } +bool TAllocationInfo::Allocate(const NActors::TActorId& ownerId) { + AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName()); + AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())( + "allocation_internal_group_id", AllocationInternalGroupId); + auto allocationResult = Stage->Allocate(AllocatedVolume); + if (allocationResult.IsFail()) { + AllocationFailed = true; + Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage()); + Allocation = nullptr; + return false; + } + const bool result = Allocation->OnAllocated( + std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); + if (!result) { + Stage->Free(AllocatedVolume, true); + AllocationFailed = true; + } + Allocation = nullptr; + return result; +} + +void TAllocationInfo::SetAllocatedVolume(const ui64 value) { + AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed); + Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated); + AllocatedVolume = value; +} + +bool TAllocationInfo::IsAllocatable(const ui64 additional) const { + return Stage->IsAllocatable(AllocatedVolume, additional); +} + +TAllocationInfo::~TAllocationInfo() { + if (GetAllocationStatus() != EAllocationStatus::Failed) { + Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated); + } + + AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName()); +} + } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h index 14b7316ac76..678a4f97f85 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h @@ -22,48 +22,17 @@ private: bool AllocationFailed = false; public: - ~TAllocationInfo() { - if (GetAllocationStatus() != EAllocationStatus::Failed) { - Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated); - } - - AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName()); - } + ~TAllocationInfo(); - bool IsAllocatable(const ui64 additional) const { - return Stage->IsAllocatable(AllocatedVolume, additional); - } + bool IsAllocatable(const ui64 additional) const; - void SetAllocatedVolume(const ui64 value) { - AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed); - Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated); - AllocatedVolume = value; - } + void SetAllocatedVolume(const ui64 value); ui64 GetAllocatedVolume() const { return AllocatedVolume; } - [[nodiscard]] bool Allocate(const NActors::TActorId& ownerId) { - AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName()); - AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())( - "allocation_internal_group_id", AllocationInternalGroupId); - auto allocationResult = Stage->Allocate(AllocatedVolume); - if (allocationResult.IsFail()) { - AllocationFailed = true; - Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage()); - Allocation = nullptr; - return false; - } - const bool result = Allocation->OnAllocated( - std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); - if (!result) { - Stage->Free(AllocatedVolume, true); - AllocationFailed = true; - } - Allocation = nullptr; - return result; - } + [[nodiscard]] bool Allocate(const NActors::TActorId& ownerId); EAllocationStatus GetAllocationStatus() const { if (AllocationFailed) { diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index 9c7c7f074d5..eeb4169eec7 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -1,4 +1,6 @@ #pragma once +#include "stage_features.h" + #include <ydb/core/tx/limiter/grouped_memory/service/counters.h> #include <ydb/library/accessor/accessor.h> @@ -10,8 +12,6 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { -class TStageFeatures; - class TGroupGuard { private: const NActors::TActorId ActorId; @@ -76,126 +76,6 @@ public: ~TAllocationGuard(); }; -class TStageFeatures { -private: - YDB_READONLY_DEF(TString, Name); - YDB_READONLY(ui64, Limit, 0); - YDB_READONLY(ui64, HardLimit, 0); - YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage); - YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting); - std::shared_ptr<TStageFeatures> Owner; - std::shared_ptr<TStageCounters> Counters; - -public: - TString DebugString() const { - TStringBuilder result; - result << "name=" << Name << ";limit=" << Limit << ";"; - if (Owner) { - result << "owner=" << Owner->DebugString() << ";"; - } - return result; - } - - ui64 GetFullMemory() const { - return Usage.Val() + Waiting.Val(); - } - - TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner, - const std::shared_ptr<TStageCounters>& counters) - : Name(name) - , Limit(limit) - , HardLimit(hardLimit) - , Owner(owner) - , Counters(counters) { - } - - [[nodiscard]] TConclusionStatus Allocate(const ui64 volume) { - Waiting.Sub(volume); - if (HardLimit < Usage.Val() + volume) { - Counters->OnCannotAllocate(); - AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "cannot_allocate")("limit", HardLimit)( - "usage", Usage.Val())("delta", volume); - return TConclusionStatus::Fail( - TStringBuilder() << Name << "::(limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ");"); - } - Usage.Add(volume); - AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "allocate")("usage", Usage.Val())("delta", volume); - if (Counters) { - Counters->Add(volume, true); - Counters->Sub(volume, false); - } - if (Owner) { - const auto ownerResult = Owner->Allocate(volume); - if (ownerResult.IsFail()) { - Free(volume, true, false); - return ownerResult; - } - } - return TConclusionStatus::Success(); - } - - void Free(const ui64 volume, const bool allocated, const bool withOwner = true) { - if (Counters) { - Counters->Sub(volume, allocated); - } - if (allocated) { - Usage.Sub(volume); - } else { - Waiting.Sub(volume); - } - AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "free")("usage", Usage.Val())("delta", volume); - - if (withOwner && Owner) { - Owner->Free(volume, allocated); - } - } - - void UpdateVolume(const ui64 from, const ui64 to, const bool allocated) { - if (Counters) { - Counters->Sub(from, allocated); - Counters->Add(to, allocated); - } - AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())( - "allocated", allocated)("from", from)("to", to); - if (allocated) { - Usage.Sub(from); - Usage.Add(to); - } else { - Waiting.Sub(from); - Waiting.Add(to); - } - - if (Owner) { - Owner->UpdateVolume(from, to, allocated); - } - } - - bool IsAllocatable(const ui64 volume, const ui64 additional) const { - if (Limit < additional + Usage.Val() + volume) { - return false; - } - if (Owner) { - return Owner->IsAllocatable(volume, additional); - } - return true; - } - - void Add(const ui64 volume, const bool allocated) { - if (Counters) { - Counters->Add(volume, allocated); - } - if (allocated) { - Usage.Add(volume); - } else { - Waiting.Add(volume); - } - - if (Owner) { - Owner->Add(volume, allocated); - } - } -}; - class IAllocation { private: static inline TAtomicCounter Counter = 0; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp new file mode 100644 index 00000000000..2ad32c82030 --- /dev/null +++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp @@ -0,0 +1,130 @@ +#include "stage_features.h" + +#include <ydb/library/actors/core/log.h> + +#include <util/string/builder.h> + +namespace NKikimr::NOlap::NGroupedMemoryManager { + +TString TStageFeatures::DebugString() const { + TStringBuilder result; + result << "name=" << Name << ";limit=" << Limit << ";"; + if (Owner) { + result << "owner=" << Owner->DebugString() << ";"; + } + return result; +} + +TStageFeatures::TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner, + const std::shared_ptr<TStageCounters>& counters) + : Name(name) + , Limit(limit) + , HardLimit(hardLimit) + , Owner(owner) + , Counters(counters) { +} + +TConclusionStatus TStageFeatures::Allocate(const ui64 volume) { + std::optional<TConclusionStatus> result; + { + auto* current = this; + while (current) { + current->Waiting.Sub(volume); + if (current->Counters) { + current->Counters->Sub(volume, false); + } + if (current->HardLimit < current->Usage.Val() + volume) { + if (!result) { + result = TConclusionStatus::Fail(TStringBuilder() << current->Name << "::(limit:" << current->HardLimit + << ";val:" << current->Usage.Val() << ";delta=" << volume << ");"); + } + if (current->Counters) { + current->Counters->OnCannotAllocate(); + } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "cannot_allocate")( + "limit", current->HardLimit)("usage", current->Usage.Val())("delta", volume); + } + current = current->Owner.get(); + } + } + if (!!result) { + return *result; + } + { + auto* current = this; + while (current) { + current->Usage.Add(volume); + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "allocate")("usage", current->Usage.Val())( + "delta", volume); + if (current->Counters) { + current->Counters->Add(volume, true); + } + current = current->Owner.get(); + } + } + return TConclusionStatus::Success(); +} + +void TStageFeatures::Free(const ui64 volume, const bool allocated) { + auto* current = this; + while (current) { + if (current->Counters) { + current->Counters->Sub(volume, allocated); + } + if (allocated) { + current->Usage.Sub(volume); + } else { + current->Waiting.Sub(volume); + } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "free")("usage", current->Usage.Val())( + "delta", volume); + current = current->Owner.get(); + } +} + +void TStageFeatures::UpdateVolume(const ui64 from, const ui64 to, const bool allocated) { + if (Counters) { + Counters->Sub(from, allocated); + Counters->Add(to, allocated); + } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())( + "allocated", allocated)("from", from)("to", to); + if (allocated) { + Usage.Sub(from); + Usage.Add(to); + } else { + Waiting.Sub(from); + Waiting.Add(to); + } + + if (Owner) { + Owner->UpdateVolume(from, to, allocated); + } +} + +bool TStageFeatures::IsAllocatable(const ui64 volume, const ui64 additional) const { + if (Limit < additional + Usage.Val() + volume) { + return false; + } + if (Owner) { + return Owner->IsAllocatable(volume, additional); + } + return true; +} + +void TStageFeatures::Add(const ui64 volume, const bool allocated) { + if (Counters) { + Counters->Add(volume, allocated); + } + if (allocated) { + Usage.Add(volume); + } else { + Waiting.Add(volume); + } + + if (Owner) { + Owner->Add(volume, allocated); + } +} + +} // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h new file mode 100644 index 00000000000..4cb9146152f --- /dev/null +++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h @@ -0,0 +1,37 @@ +#pragma once +#include <ydb/core/tx/limiter/grouped_memory/service/counters.h> + +#include <ydb/library/accessor/positive_integer.h> +#include <ydb/library/conclusion/status.h> + +namespace NKikimr::NOlap::NGroupedMemoryManager { + +class TStageFeatures { +private: + YDB_READONLY_DEF(TString, Name); + YDB_READONLY(ui64, Limit, 0); + YDB_READONLY(ui64, HardLimit, 0); + YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage); + YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting); + std::shared_ptr<TStageFeatures> Owner; + std::shared_ptr<TStageCounters> Counters; + +public: + TString DebugString() const; + + ui64 GetFullMemory() const { + return Usage.Val() + Waiting.Val(); + } + + TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner, + const std::shared_ptr<TStageCounters>& counters); + + [[nodiscard]] TConclusionStatus Allocate(const ui64 volume); + + void Free(const ui64 volume, const bool allocated); + void UpdateVolume(const ui64 from, const ui64 to, const bool allocated); + bool IsAllocatable(const ui64 volume, const ui64 additional) const; + void Add(const ui64 volume, const bool allocated); +}; + +} // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/usage/ya.make b/ydb/core/tx/limiter/grouped_memory/usage/ya.make index 4295b9f8caf..64de73fcf8a 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/ya.make +++ b/ydb/core/tx/limiter/grouped_memory/usage/ya.make @@ -5,6 +5,7 @@ SRCS( config.cpp abstract.cpp service.cpp + stage_features.cpp ) PEERDIR( diff --git a/ydb/library/accessor/positive_integer.cpp b/ydb/library/accessor/positive_integer.cpp index 29723a1c097..dce6a9c33af 100644 --- a/ydb/library/accessor/positive_integer.cpp +++ b/ydb/library/accessor/positive_integer.cpp @@ -8,31 +8,30 @@ TPositiveControlInteger::TPositiveControlInteger(const i64 value) AFL_VERIFY(0 <= value); } +TPositiveControlInteger::TPositiveControlInteger(const ui64 value): Value(value) { + AFL_VERIFY(Value.Val() >= 0)("value", Value.Val())("init", value); +} + ui64 TPositiveControlInteger::Add(const ui64 value) { - Value += value; - return Value; + const i64 result = Value.Add(value); + AFL_VERIFY(result >= 0)("base", Value.Val())("delta", value)("result", result); + return result; } ui64 TPositiveControlInteger::Sub(const ui64 value) { - if (value <= Value) { - Value -= value; - } else { - AFL_VERIFY(false)("base", Value)("delta", value); - } - return Value; + i64 valDelta = Value.Sub(value); + AFL_VERIFY(valDelta >= 0)("base", Value.Val())("delta", value)("sub", valDelta); + return valDelta; } ui64 TPositiveControlInteger::GetDec() const { - if (Value) { - return Value - 1; - } else { - AFL_VERIFY(false); - } - return 0; + const i64 result = Value.Val() - 1; + AFL_VERIFY(result >= 0); + return result; } ui64 TPositiveControlInteger::Val() const { - return Value; + return Value.Val(); } } diff --git a/ydb/library/accessor/positive_integer.h b/ydb/library/accessor/positive_integer.h index a7cfab955ef..3df6e2a6e4b 100644 --- a/ydb/library/accessor/positive_integer.h +++ b/ydb/library/accessor/positive_integer.h @@ -7,13 +7,10 @@ namespace NKikimr { class TPositiveControlInteger { private: - ui64 Value = 0; + TAtomicCounter Value = 0; public: TPositiveControlInteger() = default; - TPositiveControlInteger(const ui64 value) - : Value(value) { - - } + TPositiveControlInteger(const ui64 value); TPositiveControlInteger(const ui32 value) : Value(value) { @@ -30,10 +27,10 @@ public: ui64 GetDec() const; ui64 Val() const; bool operator!() const { - return !Value; + return !Value.Val(); } operator ui64() const { - return Value; + return Value.Val(); } ui64 operator++() { return Inc(); |