aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-01-25 13:42:10 +0300
committerGitHub <noreply@github.com>2025-01-25 13:42:10 +0300
commit98a968de4b9adb6c220244aca144824564a11093 (patch)
treed8eea6ae86b42e931de0fe5673fb09a81fd1cfaf
parentf1de4af13762632983a24cd9a7ac226d6c6704e3 (diff)
downloadydb-98a968de4b9adb6c220244aca144824564a11093.tar.gz
fix race for positive control counters usage (#13811)
-rw-r--r--ydb/core/tx/limiter/grouped_memory/service/allocation.cpp39
-rw-r--r--ydb/core/tx/limiter/grouped_memory/service/allocation.h39
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/abstract.h124
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp130
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/stage_features.h37
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/ya.make1
-rw-r--r--ydb/library/accessor/positive_integer.cpp29
-rw-r--r--ydb/library/accessor/positive_integer.h11
8 files changed, 231 insertions, 179 deletions
diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp
index 1dfe27953eb..768b0925dd1 100644
--- a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp
+++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp
@@ -23,4 +23,43 @@ TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 scopeId, const
Stage->Add(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
}
+bool TAllocationInfo::Allocate(const NActors::TActorId& ownerId) {
+ AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
+ AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
+ "allocation_internal_group_id", AllocationInternalGroupId);
+ auto allocationResult = Stage->Allocate(AllocatedVolume);
+ if (allocationResult.IsFail()) {
+ AllocationFailed = true;
+ Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
+ Allocation = nullptr;
+ return false;
+ }
+ const bool result = Allocation->OnAllocated(
+ std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
+ if (!result) {
+ Stage->Free(AllocatedVolume, true);
+ AllocationFailed = true;
+ }
+ Allocation = nullptr;
+ return result;
+}
+
+void TAllocationInfo::SetAllocatedVolume(const ui64 value) {
+ AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed);
+ Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated);
+ AllocatedVolume = value;
+}
+
+bool TAllocationInfo::IsAllocatable(const ui64 additional) const {
+ return Stage->IsAllocatable(AllocatedVolume, additional);
+}
+
+TAllocationInfo::~TAllocationInfo() {
+ if (GetAllocationStatus() != EAllocationStatus::Failed) {
+ Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
+ }
+
+ AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName());
+}
+
} // namespace NKikimr::NOlap::NGroupedMemoryManager
diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h
index 14b7316ac76..678a4f97f85 100644
--- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h
+++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h
@@ -22,48 +22,17 @@ private:
bool AllocationFailed = false;
public:
- ~TAllocationInfo() {
- if (GetAllocationStatus() != EAllocationStatus::Failed) {
- Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated);
- }
-
- AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName());
- }
+ ~TAllocationInfo();
- bool IsAllocatable(const ui64 additional) const {
- return Stage->IsAllocatable(AllocatedVolume, additional);
- }
+ bool IsAllocatable(const ui64 additional) const;
- void SetAllocatedVolume(const ui64 value) {
- AFL_VERIFY(GetAllocationStatus() != EAllocationStatus::Failed);
- Stage->UpdateVolume(AllocatedVolume, value, GetAllocationStatus() == EAllocationStatus::Allocated);
- AllocatedVolume = value;
- }
+ void SetAllocatedVolume(const ui64 value);
ui64 GetAllocatedVolume() const {
return AllocatedVolume;
}
- [[nodiscard]] bool Allocate(const NActors::TActorId& ownerId) {
- AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName());
- AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())(
- "allocation_internal_group_id", AllocationInternalGroupId);
- auto allocationResult = Stage->Allocate(AllocatedVolume);
- if (allocationResult.IsFail()) {
- AllocationFailed = true;
- Allocation->OnAllocationImpossible(allocationResult.GetErrorMessage());
- Allocation = nullptr;
- return false;
- }
- const bool result = Allocation->OnAllocated(
- std::make_shared<TAllocationGuard>(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation);
- if (!result) {
- Stage->Free(AllocatedVolume, true);
- AllocationFailed = true;
- }
- Allocation = nullptr;
- return result;
- }
+ [[nodiscard]] bool Allocate(const NActors::TActorId& ownerId);
EAllocationStatus GetAllocationStatus() const {
if (AllocationFailed) {
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
index 9c7c7f074d5..eeb4169eec7 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
@@ -1,4 +1,6 @@
#pragma once
+#include "stage_features.h"
+
#include <ydb/core/tx/limiter/grouped_memory/service/counters.h>
#include <ydb/library/accessor/accessor.h>
@@ -10,8 +12,6 @@
namespace NKikimr::NOlap::NGroupedMemoryManager {
-class TStageFeatures;
-
class TGroupGuard {
private:
const NActors::TActorId ActorId;
@@ -76,126 +76,6 @@ public:
~TAllocationGuard();
};
-class TStageFeatures {
-private:
- YDB_READONLY_DEF(TString, Name);
- YDB_READONLY(ui64, Limit, 0);
- YDB_READONLY(ui64, HardLimit, 0);
- YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
- YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
- std::shared_ptr<TStageFeatures> Owner;
- std::shared_ptr<TStageCounters> Counters;
-
-public:
- TString DebugString() const {
- TStringBuilder result;
- result << "name=" << Name << ";limit=" << Limit << ";";
- if (Owner) {
- result << "owner=" << Owner->DebugString() << ";";
- }
- return result;
- }
-
- ui64 GetFullMemory() const {
- return Usage.Val() + Waiting.Val();
- }
-
- TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
- const std::shared_ptr<TStageCounters>& counters)
- : Name(name)
- , Limit(limit)
- , HardLimit(hardLimit)
- , Owner(owner)
- , Counters(counters) {
- }
-
- [[nodiscard]] TConclusionStatus Allocate(const ui64 volume) {
- Waiting.Sub(volume);
- if (HardLimit < Usage.Val() + volume) {
- Counters->OnCannotAllocate();
- AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "cannot_allocate")("limit", HardLimit)(
- "usage", Usage.Val())("delta", volume);
- return TConclusionStatus::Fail(
- TStringBuilder() << Name << "::(limit:" << HardLimit << ";val:" << Usage.Val() << ";delta=" << volume << ");");
- }
- Usage.Add(volume);
- AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "allocate")("usage", Usage.Val())("delta", volume);
- if (Counters) {
- Counters->Add(volume, true);
- Counters->Sub(volume, false);
- }
- if (Owner) {
- const auto ownerResult = Owner->Allocate(volume);
- if (ownerResult.IsFail()) {
- Free(volume, true, false);
- return ownerResult;
- }
- }
- return TConclusionStatus::Success();
- }
-
- void Free(const ui64 volume, const bool allocated, const bool withOwner = true) {
- if (Counters) {
- Counters->Sub(volume, allocated);
- }
- if (allocated) {
- Usage.Sub(volume);
- } else {
- Waiting.Sub(volume);
- }
- AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "free")("usage", Usage.Val())("delta", volume);
-
- if (withOwner && Owner) {
- Owner->Free(volume, allocated);
- }
- }
-
- void UpdateVolume(const ui64 from, const ui64 to, const bool allocated) {
- if (Counters) {
- Counters->Sub(from, allocated);
- Counters->Add(to, allocated);
- }
- AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())(
- "allocated", allocated)("from", from)("to", to);
- if (allocated) {
- Usage.Sub(from);
- Usage.Add(to);
- } else {
- Waiting.Sub(from);
- Waiting.Add(to);
- }
-
- if (Owner) {
- Owner->UpdateVolume(from, to, allocated);
- }
- }
-
- bool IsAllocatable(const ui64 volume, const ui64 additional) const {
- if (Limit < additional + Usage.Val() + volume) {
- return false;
- }
- if (Owner) {
- return Owner->IsAllocatable(volume, additional);
- }
- return true;
- }
-
- void Add(const ui64 volume, const bool allocated) {
- if (Counters) {
- Counters->Add(volume, allocated);
- }
- if (allocated) {
- Usage.Add(volume);
- } else {
- Waiting.Add(volume);
- }
-
- if (Owner) {
- Owner->Add(volume, allocated);
- }
- }
-};
-
class IAllocation {
private:
static inline TAtomicCounter Counter = 0;
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp
new file mode 100644
index 00000000000..2ad32c82030
--- /dev/null
+++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp
@@ -0,0 +1,130 @@
+#include "stage_features.h"
+
+#include <ydb/library/actors/core/log.h>
+
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NGroupedMemoryManager {
+
+TString TStageFeatures::DebugString() const {
+ TStringBuilder result;
+ result << "name=" << Name << ";limit=" << Limit << ";";
+ if (Owner) {
+ result << "owner=" << Owner->DebugString() << ";";
+ }
+ return result;
+}
+
+TStageFeatures::TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
+ const std::shared_ptr<TStageCounters>& counters)
+ : Name(name)
+ , Limit(limit)
+ , HardLimit(hardLimit)
+ , Owner(owner)
+ , Counters(counters) {
+}
+
+TConclusionStatus TStageFeatures::Allocate(const ui64 volume) {
+ std::optional<TConclusionStatus> result;
+ {
+ auto* current = this;
+ while (current) {
+ current->Waiting.Sub(volume);
+ if (current->Counters) {
+ current->Counters->Sub(volume, false);
+ }
+ if (current->HardLimit < current->Usage.Val() + volume) {
+ if (!result) {
+ result = TConclusionStatus::Fail(TStringBuilder() << current->Name << "::(limit:" << current->HardLimit
+ << ";val:" << current->Usage.Val() << ";delta=" << volume << ");");
+ }
+ if (current->Counters) {
+ current->Counters->OnCannotAllocate();
+ }
+ AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "cannot_allocate")(
+ "limit", current->HardLimit)("usage", current->Usage.Val())("delta", volume);
+ }
+ current = current->Owner.get();
+ }
+ }
+ if (!!result) {
+ return *result;
+ }
+ {
+ auto* current = this;
+ while (current) {
+ current->Usage.Add(volume);
+ AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "allocate")("usage", current->Usage.Val())(
+ "delta", volume);
+ if (current->Counters) {
+ current->Counters->Add(volume, true);
+ }
+ current = current->Owner.get();
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
+void TStageFeatures::Free(const ui64 volume, const bool allocated) {
+ auto* current = this;
+ while (current) {
+ if (current->Counters) {
+ current->Counters->Sub(volume, allocated);
+ }
+ if (allocated) {
+ current->Usage.Sub(volume);
+ } else {
+ current->Waiting.Sub(volume);
+ }
+ AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "free")("usage", current->Usage.Val())(
+ "delta", volume);
+ current = current->Owner.get();
+ }
+}
+
+void TStageFeatures::UpdateVolume(const ui64 from, const ui64 to, const bool allocated) {
+ if (Counters) {
+ Counters->Sub(from, allocated);
+ Counters->Add(to, allocated);
+ }
+ AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", Name)("event", "update")("usage", Usage.Val())("waiting", Waiting.Val())(
+ "allocated", allocated)("from", from)("to", to);
+ if (allocated) {
+ Usage.Sub(from);
+ Usage.Add(to);
+ } else {
+ Waiting.Sub(from);
+ Waiting.Add(to);
+ }
+
+ if (Owner) {
+ Owner->UpdateVolume(from, to, allocated);
+ }
+}
+
+bool TStageFeatures::IsAllocatable(const ui64 volume, const ui64 additional) const {
+ if (Limit < additional + Usage.Val() + volume) {
+ return false;
+ }
+ if (Owner) {
+ return Owner->IsAllocatable(volume, additional);
+ }
+ return true;
+}
+
+void TStageFeatures::Add(const ui64 volume, const bool allocated) {
+ if (Counters) {
+ Counters->Add(volume, allocated);
+ }
+ if (allocated) {
+ Usage.Add(volume);
+ } else {
+ Waiting.Add(volume);
+ }
+
+ if (Owner) {
+ Owner->Add(volume, allocated);
+ }
+}
+
+} // namespace NKikimr::NOlap::NGroupedMemoryManager
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h
new file mode 100644
index 00000000000..4cb9146152f
--- /dev/null
+++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h
@@ -0,0 +1,37 @@
+#pragma once
+#include <ydb/core/tx/limiter/grouped_memory/service/counters.h>
+
+#include <ydb/library/accessor/positive_integer.h>
+#include <ydb/library/conclusion/status.h>
+
+namespace NKikimr::NOlap::NGroupedMemoryManager {
+
+class TStageFeatures {
+private:
+ YDB_READONLY_DEF(TString, Name);
+ YDB_READONLY(ui64, Limit, 0);
+ YDB_READONLY(ui64, HardLimit, 0);
+ YDB_ACCESSOR_DEF(TPositiveControlInteger, Usage);
+ YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting);
+ std::shared_ptr<TStageFeatures> Owner;
+ std::shared_ptr<TStageCounters> Counters;
+
+public:
+ TString DebugString() const;
+
+ ui64 GetFullMemory() const {
+ return Usage.Val() + Waiting.Val();
+ }
+
+ TStageFeatures(const TString& name, const ui64 limit, const ui64 hardLimit, const std::shared_ptr<TStageFeatures>& owner,
+ const std::shared_ptr<TStageCounters>& counters);
+
+ [[nodiscard]] TConclusionStatus Allocate(const ui64 volume);
+
+ void Free(const ui64 volume, const bool allocated);
+ void UpdateVolume(const ui64 from, const ui64 to, const bool allocated);
+ bool IsAllocatable(const ui64 volume, const ui64 additional) const;
+ void Add(const ui64 volume, const bool allocated);
+};
+
+} // namespace NKikimr::NOlap::NGroupedMemoryManager
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/ya.make b/ydb/core/tx/limiter/grouped_memory/usage/ya.make
index 4295b9f8caf..64de73fcf8a 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/ya.make
+++ b/ydb/core/tx/limiter/grouped_memory/usage/ya.make
@@ -5,6 +5,7 @@ SRCS(
config.cpp
abstract.cpp
service.cpp
+ stage_features.cpp
)
PEERDIR(
diff --git a/ydb/library/accessor/positive_integer.cpp b/ydb/library/accessor/positive_integer.cpp
index 29723a1c097..dce6a9c33af 100644
--- a/ydb/library/accessor/positive_integer.cpp
+++ b/ydb/library/accessor/positive_integer.cpp
@@ -8,31 +8,30 @@ TPositiveControlInteger::TPositiveControlInteger(const i64 value)
AFL_VERIFY(0 <= value);
}
+TPositiveControlInteger::TPositiveControlInteger(const ui64 value): Value(value) {
+ AFL_VERIFY(Value.Val() >= 0)("value", Value.Val())("init", value);
+}
+
ui64 TPositiveControlInteger::Add(const ui64 value) {
- Value += value;
- return Value;
+ const i64 result = Value.Add(value);
+ AFL_VERIFY(result >= 0)("base", Value.Val())("delta", value)("result", result);
+ return result;
}
ui64 TPositiveControlInteger::Sub(const ui64 value) {
- if (value <= Value) {
- Value -= value;
- } else {
- AFL_VERIFY(false)("base", Value)("delta", value);
- }
- return Value;
+ i64 valDelta = Value.Sub(value);
+ AFL_VERIFY(valDelta >= 0)("base", Value.Val())("delta", value)("sub", valDelta);
+ return valDelta;
}
ui64 TPositiveControlInteger::GetDec() const {
- if (Value) {
- return Value - 1;
- } else {
- AFL_VERIFY(false);
- }
- return 0;
+ const i64 result = Value.Val() - 1;
+ AFL_VERIFY(result >= 0);
+ return result;
}
ui64 TPositiveControlInteger::Val() const {
- return Value;
+ return Value.Val();
}
}
diff --git a/ydb/library/accessor/positive_integer.h b/ydb/library/accessor/positive_integer.h
index a7cfab955ef..3df6e2a6e4b 100644
--- a/ydb/library/accessor/positive_integer.h
+++ b/ydb/library/accessor/positive_integer.h
@@ -7,13 +7,10 @@ namespace NKikimr {
class TPositiveControlInteger {
private:
- ui64 Value = 0;
+ TAtomicCounter Value = 0;
public:
TPositiveControlInteger() = default;
- TPositiveControlInteger(const ui64 value)
- : Value(value) {
-
- }
+ TPositiveControlInteger(const ui64 value);
TPositiveControlInteger(const ui32 value)
: Value(value) {
@@ -30,10 +27,10 @@ public:
ui64 GetDec() const;
ui64 Val() const;
bool operator!() const {
- return !Value;
+ return !Value.Val();
}
operator ui64() const {
- return Value;
+ return Value.Val();
}
ui64 operator++() {
return Inc();