diff options
| author | Semyon <[email protected]> | 2025-07-10 14:49:30 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-07-10 14:49:30 +0300 |
| commit | 8fbe607156734579c3523ea260743b6cbb258e82 (patch) | |
| tree | 04b2630bedd6739a659307df446f176eaa75ec9c | |
| parent | 95be86ae208c6770b91f797a2836fdcb2ae499f4 (diff) | |
change memory control in cs data fetcher (#20732)
10 files changed, 58 insertions, 53 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 8cd17e22acb..63a10daccf0 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -626,8 +626,11 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo compaction.Start(*this); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); + static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo, - NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier()); + NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard); auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), std::make_shared<TCompactionExecutor>( @@ -747,8 +750,11 @@ bool TColumnShard::SetupTtl() { auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); for (auto&& i : indexChanges) { i->Start(*this); + static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); NOlap::NDataFetcher::TRequestInput rInput( - i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier()); + i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier(), processGuard); auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); if (i->NeedConstruction()) { NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), @@ -788,8 +794,11 @@ void TColumnShard::SetupCleanupPortions() { changes->Start(*this); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); - NOlap::NDataFetcher::TRequestInput rInput( - changes->GetPortionsToAccess(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier()); + static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); + NOlap::NDataFetcher::TRequestInput rInput(changes->GetPortionsToAccess(), actualIndexInfo, + NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier(), processGuard); auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(std::move(rInput), std::make_shared<TCompactionExecutor>( diff --git a/ydb/core/tx/columnshard/data_reader/contexts.cpp b/ydb/core/tx/columnshard/data_reader/contexts.cpp index 14b9b5db535..e660ffef129 100644 --- a/ydb/core/tx/columnshard/data_reader/contexts.cpp +++ b/ydb/core/tx/columnshard/data_reader/contexts.cpp @@ -15,10 +15,10 @@ IFetchingStep::EStepResult IFetchingStep::Execute(const std::shared_ptr<TPortion TRequestInput::TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<const TVersionedIndex>& versions, const NBlobOperations::EConsumer consumer, const TString& externalTaskId, - const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo) + const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard) : Consumer(consumer) , ExternalTaskId(externalTaskId) - , MemoryProcessInfo(memoryProcessInfo ? *memoryProcessInfo : TFetcherMemoryProcessInfo()) + , MemoryProcessGuard(memoryProcessGuard) { AFL_VERIFY(portions.size()); ActualSchema = versions->GetLastSchema(); diff --git a/ydb/core/tx/columnshard/data_reader/contexts.h b/ydb/core/tx/columnshard/data_reader/contexts.h index d82e803c047..6b3717577c1 100644 --- a/ydb/core/tx/columnshard/data_reader/contexts.h +++ b/ydb/core/tx/columnshard/data_reader/contexts.h @@ -21,41 +21,28 @@ enum class EFetchingStage : ui32 { Error }; -class TFetcherMemoryProcessInfo { -private: - static inline TAtomicCounter Counter = 0; - ui64 MemoryProcessId = Counter.Inc(); - -public: - ui64 GetMemoryProcessId() const { - return MemoryProcessId; - } -}; - class TCurrentContext: TMoveOnly { private: std::optional<std::vector<TPortionDataAccessor>> Accessors; YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards); std::shared_ptr<NGroupedMemoryManager::TProcessGuard> MemoryProcessGuard; - std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryProcessScopeGuard; - std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryProcessGroupGuard; - TFetcherMemoryProcessInfo MemoryProcessInfo; + std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryScopeGuard; + std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryGroupGuard; std::optional<NBlobOperations::NRead::TCompositeReadBlobs> Blobs; std::optional<std::vector<NArrow::TGeneralContainer>> AssembledData; + inline static TAtomicCounter MemoryScopeIdCounter = 0; public: ui64 GetMemoryProcessId() const { - return MemoryProcessInfo.GetMemoryProcessId(); + return MemoryProcessGuard->GetProcessId(); } ui64 GetMemoryScopeId() const { - AFL_VERIFY(!!MemoryProcessScopeGuard); - return MemoryProcessScopeGuard->GetScopeId(); + return MemoryScopeGuard->GetScopeId(); } ui64 GetMemoryGroupId() const { - AFL_VERIFY(!!MemoryProcessGroupGuard); - return MemoryProcessGroupGuard->GetGroupId(); + return MemoryGroupGuard->GetGroupId(); } void SetBlobs(NBlobOperations::NRead::TCompositeReadBlobs&& blobs) { @@ -91,15 +78,12 @@ public: return result; } - TCurrentContext(const TFetcherMemoryProcessInfo& memoryProcessInfo) - : MemoryProcessInfo(memoryProcessInfo) + TCurrentContext(const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard) + : MemoryProcessGuard(memoryProcessGuard) { - static std::shared_ptr<NGroupedMemoryManager::TStageFeatures> stageFeatures = - NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); - - MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(GetMemoryProcessId(), { stageFeatures }); - MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(GetMemoryProcessId(), 1); - MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(GetMemoryProcessId(), 1); + AFL_VERIFY(MemoryProcessGuard); + MemoryScopeGuard = MemoryProcessGuard->BuildScopeGuard(MemoryScopeIdCounter.Inc()); + MemoryGroupGuard = MemoryScopeGuard->BuildGroupGuard(); } void SetPortionAccessors(std::vector<TPortionDataAccessor>&& acc) { @@ -261,12 +245,12 @@ private: YDB_READONLY_DEF(std::shared_ptr<ISnapshotSchema>, ActualSchema); YDB_READONLY(NBlobOperations::EConsumer, Consumer, NBlobOperations::EConsumer::UNDEFINED); YDB_READONLY_DEF(TString, ExternalTaskId); - YDB_READONLY_DEF(TFetcherMemoryProcessInfo, MemoryProcessInfo); + YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TProcessGuard>, MemoryProcessGuard); public: TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<const TVersionedIndex>& versions, const NBlobOperations::EConsumer consumer, const TString& externalTaskId, - const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo = std::nullopt); + const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard); }; } // namespace NKikimr::NOlap::NDataFetcher diff --git a/ydb/core/tx/columnshard/data_reader/fetcher.h b/ydb/core/tx/columnshard/data_reader/fetcher.h index 50bb0f4ee11..53c6e55a356 100644 --- a/ydb/core/tx/columnshard/data_reader/fetcher.h +++ b/ydb/core/tx/columnshard/data_reader/fetcher.h @@ -52,7 +52,7 @@ public: class TPortionsDataFetcher: TNonCopyable { private: - const TRequestInput Input; + TRequestInput Input; const std::shared_ptr<IFetchCallback> Callback; std::shared_ptr<TClassCounters> ClassCounters; NCounters::TStateSignalsOperator<EFetchingStage>::TGuard Guard; @@ -80,7 +80,7 @@ public: , ClassCounters(Singleton<TCounters>()->GetClassCounters(Callback->GetClassName())) , Guard(ClassCounters->GetGuard(EFetchingStage::Created)) , Script(script) - , CurrentContext(input.GetMemoryProcessInfo()) + , CurrentContext(Input.GetMemoryProcessGuard()) , Environment(environment) , ConveyorCategory(conveyorCategory) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp index bfa6b64c88c..657da7928a1 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp @@ -40,8 +40,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) }; ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages); - ProcessScopeGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId()); + ProcessScopeGuard = ProcessMemoryGuard->BuildScopeGuard(GetCommonContext()->GetScanId()); auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index a431b81077f..00b5e6f89df 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -38,8 +38,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi , TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) , Sources(sources) , IntervalIdx(intervalIdx) - , IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId())) + , IntervalGroupGuard(Context->GetProcessScopeGuard()->BuildGroupGuard()) , IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) { AFL_VERIFY(Sources.size()); for (auto&& [_, i] : Sources) { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index a25c1df37cd..6122d4cd7b4 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -33,8 +33,7 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) InitStageData(std::make_unique<TFetchedData>( GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCountOptional())); ProcessingStarted = true; - SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); + SourceGroupGuard = GetContext()->GetProcessScopeGuard()->BuildGroupGuard(); SetMemoryGroupId(SourceGroupGuard->GetGroupId()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp index 53499aae026..3bbab05256c 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp @@ -96,6 +96,7 @@ void TManager::RegisterAllocation(const ui64 externalProcessId, const ui64 exter process->RegisterAllocation(externalScopeId, externalGroupId, task, stageIdx); } else { AFL_VERIFY(!task->OnAllocated(std::make_shared<TAllocationGuard>(externalProcessId, externalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))( + "process", externalProcessId)("scope", externalScopeId)( "ext_group", externalGroupId)("stage_idx", stageIdx); } RefreshSignals(); diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index eeb4169eec7..6dbd78ce9d0 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -12,6 +12,10 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { +class TGroupGuard; +class TScopeGuard; +class TProcessGuard; + class TGroupGuard { private: const NActors::TActorId ActorId; @@ -33,6 +37,10 @@ private: public: TProcessGuard(const NActors::TActorId& actorId, const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages); + std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui32 scopeId) const { + return std::make_shared<TScopeGuard>(ActorId, ProcessId, scopeId); + } + ~TProcessGuard(); }; @@ -45,6 +53,11 @@ private: public: TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId); + std::shared_ptr<TGroupGuard> BuildGroupGuard() const { + static TAtomicCounter counter = 0; + return std::make_shared<TGroupGuard>(ActorId, ProcessId, ScopeId, counter.Inc()); + } + ~TScopeGuard(); }; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 8e7608bc115..d119cbf7df0 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -14,6 +14,7 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { template <class TMemoryLimiterPolicy> class TServiceOperatorImpl { private: + TAtomicCounter LastProcessId = 0; TConfig ServiceConfig = TConfig::BuildDisabledConfig(); std::shared_ptr<TCounters> Counters; std::shared_ptr<TStageFeatures> DefaultStageFeatures = @@ -50,20 +51,18 @@ public: return Singleton<TSelf>()->DefaultStageFeatures; } - static std::shared_ptr<TGroupGuard> BuildGroupGuard(const ui64 processId, const ui32 scopeId) { - static TAtomicCounter counter = 0; + static std::shared_ptr<TProcessGuard> BuildProcessGuard(const std::vector<std::shared_ptr<TStageFeatures>>& stages) + requires(!TMemoryLimiterPolicy::ExternalProcessIdAllocation) + { + ui64 processId = Singleton<TSelf>()->LastProcessId.Inc(); auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; - return std::make_shared<TGroupGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId, counter.Inc()); - } - - static std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui64 processId, const ui32 scopeId) { - auto& context = NActors::TActorContext::AsActorContext(); - const NActors::TActorId& selfId = context.SelfID; - return std::make_shared<TScopeGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId); + return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages); } - static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages) { + static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages) + requires(TMemoryLimiterPolicy::ExternalProcessIdAllocation) + { auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages); @@ -102,6 +101,7 @@ class TScanMemoryLimiterPolicy { public: static const inline TString Name = "Scan"; static const inline NMemory::EMemoryConsumerKind ConsumerKind = NMemory::EMemoryConsumerKind::ScanGroupedMemoryLimiter; + static constexpr bool ExternalProcessIdAllocation = true; }; using TScanMemoryLimiterOperator = TServiceOperatorImpl<TScanMemoryLimiterPolicy>; @@ -110,6 +110,7 @@ class TCompMemoryLimiterPolicy { public: static const inline TString Name = "Comp"; static const inline NMemory::EMemoryConsumerKind ConsumerKind = NMemory::EMemoryConsumerKind::CompGroupedMemoryLimiter; + static constexpr bool ExternalProcessIdAllocation = false; }; using TCompMemoryLimiterOperator = TServiceOperatorImpl<TCompMemoryLimiterPolicy>; |
