summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon <[email protected]>2025-07-10 14:49:30 +0300
committerGitHub <[email protected]>2025-07-10 14:49:30 +0300
commit8fbe607156734579c3523ea260743b6cbb258e82 (patch)
tree04b2630bedd6739a659307df446f176eaa75ec9c
parent95be86ae208c6770b91f797a2836fdcb2ae499f4 (diff)
change memory control in cs data fetcher (#20732)
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp17
-rw-r--r--ydb/core/tx/columnshard/data_reader/contexts.cpp4
-rw-r--r--ydb/core/tx/columnshard/data_reader/contexts.h42
-rw-r--r--ydb/core/tx/columnshard/data_reader/fetcher.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp3
-rw-r--r--ydb/core/tx/limiter/grouped_memory/service/manager.cpp1
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/abstract.h13
-rw-r--r--ydb/core/tx/limiter/grouped_memory/usage/service.h21
10 files changed, 58 insertions, 53 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 8cd17e22acb..63a10daccf0 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -626,8 +626,11 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
compaction.Start(*this);
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
+ static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
+ NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
+ auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
- NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier());
+ NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
std::make_shared<TCompactionExecutor>(
@@ -747,8 +750,11 @@ bool TColumnShard::SetupTtl() {
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
for (auto&& i : indexChanges) {
i->Start(*this);
+ static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
+ NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
+ auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
NOlap::NDataFetcher::TRequestInput rInput(
- i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier());
+ i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
if (i->NeedConstruction()) {
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
@@ -788,8 +794,11 @@ void TColumnShard::SetupCleanupPortions() {
changes->Start(*this);
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
- NOlap::NDataFetcher::TRequestInput rInput(
- changes->GetPortionsToAccess(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier());
+ static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
+ NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
+ auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
+ NOlap::NDataFetcher::TRequestInput rInput(changes->GetPortionsToAccess(), actualIndexInfo,
+ NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(std::move(rInput),
std::make_shared<TCompactionExecutor>(
diff --git a/ydb/core/tx/columnshard/data_reader/contexts.cpp b/ydb/core/tx/columnshard/data_reader/contexts.cpp
index 14b9b5db535..e660ffef129 100644
--- a/ydb/core/tx/columnshard/data_reader/contexts.cpp
+++ b/ydb/core/tx/columnshard/data_reader/contexts.cpp
@@ -15,10 +15,10 @@ IFetchingStep::EStepResult IFetchingStep::Execute(const std::shared_ptr<TPortion
TRequestInput::TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<const TVersionedIndex>& versions,
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
- const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo)
+ const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard)
: Consumer(consumer)
, ExternalTaskId(externalTaskId)
- , MemoryProcessInfo(memoryProcessInfo ? *memoryProcessInfo : TFetcherMemoryProcessInfo())
+ , MemoryProcessGuard(memoryProcessGuard)
{
AFL_VERIFY(portions.size());
ActualSchema = versions->GetLastSchema();
diff --git a/ydb/core/tx/columnshard/data_reader/contexts.h b/ydb/core/tx/columnshard/data_reader/contexts.h
index d82e803c047..6b3717577c1 100644
--- a/ydb/core/tx/columnshard/data_reader/contexts.h
+++ b/ydb/core/tx/columnshard/data_reader/contexts.h
@@ -21,41 +21,28 @@ enum class EFetchingStage : ui32 {
Error
};
-class TFetcherMemoryProcessInfo {
-private:
- static inline TAtomicCounter Counter = 0;
- ui64 MemoryProcessId = Counter.Inc();
-
-public:
- ui64 GetMemoryProcessId() const {
- return MemoryProcessId;
- }
-};
-
class TCurrentContext: TMoveOnly {
private:
std::optional<std::vector<TPortionDataAccessor>> Accessors;
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
std::shared_ptr<NGroupedMemoryManager::TProcessGuard> MemoryProcessGuard;
- std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryProcessScopeGuard;
- std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryProcessGroupGuard;
- TFetcherMemoryProcessInfo MemoryProcessInfo;
+ std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryScopeGuard;
+ std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryGroupGuard;
std::optional<NBlobOperations::NRead::TCompositeReadBlobs> Blobs;
std::optional<std::vector<NArrow::TGeneralContainer>> AssembledData;
+ inline static TAtomicCounter MemoryScopeIdCounter = 0;
public:
ui64 GetMemoryProcessId() const {
- return MemoryProcessInfo.GetMemoryProcessId();
+ return MemoryProcessGuard->GetProcessId();
}
ui64 GetMemoryScopeId() const {
- AFL_VERIFY(!!MemoryProcessScopeGuard);
- return MemoryProcessScopeGuard->GetScopeId();
+ return MemoryScopeGuard->GetScopeId();
}
ui64 GetMemoryGroupId() const {
- AFL_VERIFY(!!MemoryProcessGroupGuard);
- return MemoryProcessGroupGuard->GetGroupId();
+ return MemoryGroupGuard->GetGroupId();
}
void SetBlobs(NBlobOperations::NRead::TCompositeReadBlobs&& blobs) {
@@ -91,15 +78,12 @@ public:
return result;
}
- TCurrentContext(const TFetcherMemoryProcessInfo& memoryProcessInfo)
- : MemoryProcessInfo(memoryProcessInfo)
+ TCurrentContext(const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard)
+ : MemoryProcessGuard(memoryProcessGuard)
{
- static std::shared_ptr<NGroupedMemoryManager::TStageFeatures> stageFeatures =
- NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
-
- MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(GetMemoryProcessId(), { stageFeatures });
- MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(GetMemoryProcessId(), 1);
- MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(GetMemoryProcessId(), 1);
+ AFL_VERIFY(MemoryProcessGuard);
+ MemoryScopeGuard = MemoryProcessGuard->BuildScopeGuard(MemoryScopeIdCounter.Inc());
+ MemoryGroupGuard = MemoryScopeGuard->BuildGroupGuard();
}
void SetPortionAccessors(std::vector<TPortionDataAccessor>&& acc) {
@@ -261,12 +245,12 @@ private:
YDB_READONLY_DEF(std::shared_ptr<ISnapshotSchema>, ActualSchema);
YDB_READONLY(NBlobOperations::EConsumer, Consumer, NBlobOperations::EConsumer::UNDEFINED);
YDB_READONLY_DEF(TString, ExternalTaskId);
- YDB_READONLY_DEF(TFetcherMemoryProcessInfo, MemoryProcessInfo);
+ YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TProcessGuard>, MemoryProcessGuard);
public:
TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<const TVersionedIndex>& versions,
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
- const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo = std::nullopt);
+ const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard);
};
} // namespace NKikimr::NOlap::NDataFetcher
diff --git a/ydb/core/tx/columnshard/data_reader/fetcher.h b/ydb/core/tx/columnshard/data_reader/fetcher.h
index 50bb0f4ee11..53c6e55a356 100644
--- a/ydb/core/tx/columnshard/data_reader/fetcher.h
+++ b/ydb/core/tx/columnshard/data_reader/fetcher.h
@@ -52,7 +52,7 @@ public:
class TPortionsDataFetcher: TNonCopyable {
private:
- const TRequestInput Input;
+ TRequestInput Input;
const std::shared_ptr<IFetchCallback> Callback;
std::shared_ptr<TClassCounters> ClassCounters;
NCounters::TStateSignalsOperator<EFetchingStage>::TGuard Guard;
@@ -80,7 +80,7 @@ public:
, ClassCounters(Singleton<TCounters>()->GetClassCounters(Callback->GetClassName()))
, Guard(ClassCounters->GetGuard(EFetchingStage::Created))
, Script(script)
- , CurrentContext(input.GetMemoryProcessInfo())
+ , CurrentContext(Input.GetMemoryProcessGuard())
, Environment(environment)
, ConveyorCategory(conveyorCategory)
{
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp
index bfa6b64c88c..657da7928a1 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp
@@ -40,8 +40,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
};
ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages);
- ProcessScopeGuard =
- NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId());
+ ProcessScopeGuard = ProcessMemoryGuard->BuildScopeGuard(GetCommonContext()->GetScanId());
auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
index a431b81077f..00b5e6f89df 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp
@@ -38,8 +38,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, Sources(sources)
, IntervalIdx(intervalIdx)
- , IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
- Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId()))
+ , IntervalGroupGuard(Context->GetProcessScopeGuard()->BuildGroupGuard())
, IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) {
AFL_VERIFY(Sources.size());
for (auto&& [_, i] : Sources) {
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
index a25c1df37cd..6122d4cd7b4 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp
@@ -33,8 +33,7 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
InitStageData(std::make_unique<TFetchedData>(
GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCountOptional()));
ProcessingStarted = true;
- SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
- GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
+ SourceGroupGuard = GetContext()->GetProcessScopeGuard()->BuildGroupGuard();
SetMemoryGroupId(SourceGroupGuard->GetGroupId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp
index 53499aae026..3bbab05256c 100644
--- a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp
+++ b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp
@@ -96,6 +96,7 @@ void TManager::RegisterAllocation(const ui64 externalProcessId, const ui64 exter
process->RegisterAllocation(externalScopeId, externalGroupId, task, stageIdx);
} else {
AFL_VERIFY(!task->OnAllocated(std::make_shared<TAllocationGuard>(externalProcessId, externalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))(
+ "process", externalProcessId)("scope", externalScopeId)(
"ext_group", externalGroupId)("stage_idx", stageIdx);
}
RefreshSignals();
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
index eeb4169eec7..6dbd78ce9d0 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h
@@ -12,6 +12,10 @@
namespace NKikimr::NOlap::NGroupedMemoryManager {
+class TGroupGuard;
+class TScopeGuard;
+class TProcessGuard;
+
class TGroupGuard {
private:
const NActors::TActorId ActorId;
@@ -33,6 +37,10 @@ private:
public:
TProcessGuard(const NActors::TActorId& actorId, const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages);
+ std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui32 scopeId) const {
+ return std::make_shared<TScopeGuard>(ActorId, ProcessId, scopeId);
+ }
+
~TProcessGuard();
};
@@ -45,6 +53,11 @@ private:
public:
TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId);
+ std::shared_ptr<TGroupGuard> BuildGroupGuard() const {
+ static TAtomicCounter counter = 0;
+ return std::make_shared<TGroupGuard>(ActorId, ProcessId, ScopeId, counter.Inc());
+ }
+
~TScopeGuard();
};
diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h
index 8e7608bc115..d119cbf7df0 100644
--- a/ydb/core/tx/limiter/grouped_memory/usage/service.h
+++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h
@@ -14,6 +14,7 @@ namespace NKikimr::NOlap::NGroupedMemoryManager {
template <class TMemoryLimiterPolicy>
class TServiceOperatorImpl {
private:
+ TAtomicCounter LastProcessId = 0;
TConfig ServiceConfig = TConfig::BuildDisabledConfig();
std::shared_ptr<TCounters> Counters;
std::shared_ptr<TStageFeatures> DefaultStageFeatures =
@@ -50,20 +51,18 @@ public:
return Singleton<TSelf>()->DefaultStageFeatures;
}
- static std::shared_ptr<TGroupGuard> BuildGroupGuard(const ui64 processId, const ui32 scopeId) {
- static TAtomicCounter counter = 0;
+ static std::shared_ptr<TProcessGuard> BuildProcessGuard(const std::vector<std::shared_ptr<TStageFeatures>>& stages)
+ requires(!TMemoryLimiterPolicy::ExternalProcessIdAllocation)
+ {
+ ui64 processId = Singleton<TSelf>()->LastProcessId.Inc();
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
- return std::make_shared<TGroupGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId, counter.Inc());
- }
-
- static std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui64 processId, const ui32 scopeId) {
- auto& context = NActors::TActorContext::AsActorContext();
- const NActors::TActorId& selfId = context.SelfID;
- return std::make_shared<TScopeGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId);
+ return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages);
}
- static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages) {
+ static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages)
+ requires(TMemoryLimiterPolicy::ExternalProcessIdAllocation)
+ {
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages);
@@ -102,6 +101,7 @@ class TScanMemoryLimiterPolicy {
public:
static const inline TString Name = "Scan";
static const inline NMemory::EMemoryConsumerKind ConsumerKind = NMemory::EMemoryConsumerKind::ScanGroupedMemoryLimiter;
+ static constexpr bool ExternalProcessIdAllocation = true;
};
using TScanMemoryLimiterOperator = TServiceOperatorImpl<TScanMemoryLimiterPolicy>;
@@ -110,6 +110,7 @@ class TCompMemoryLimiterPolicy {
public:
static const inline TString Name = "Comp";
static const inline NMemory::EMemoryConsumerKind ConsumerKind = NMemory::EMemoryConsumerKind::CompGroupedMemoryLimiter;
+ static constexpr bool ExternalProcessIdAllocation = false;
};
using TCompMemoryLimiterOperator = TServiceOperatorImpl<TCompMemoryLimiterPolicy>;