diff options
| author | Oleg Doronin <[email protected]> | 2025-10-21 22:43:03 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-21 20:43:03 +0000 |
| commit | 07aeccc41b43c9fc0b7da7680340fbac01b81427 (patch) | |
| tree | ea8613a88ccdbe8e2458de8aedce91990f76a796 | |
| parent | 1b2c55d2050b2a796436b4f0a6eb938934589f10 (diff) | |
parallel compaction has been supported (#27208)
18 files changed, 114 insertions, 89 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8156ff57056..8daf055e8a1 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -2345,6 +2345,7 @@ message TColumnShardConfig { optional uint64 InFlightLocksRangesBytesLimit = 53 [default = 1073741824]; optional bool CombineChunksInResult = 54 [default = true]; optional bool EnableDiagnostics = 55 [default = true]; + optional bool EnableParallelCompaction = 56 [default = true]; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 4a6504183a5..f5d3b3f822e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -623,28 +623,31 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo Counters.GetCSCounters().OnSetupCompaction(); BackgroundController.ResetWaitingPriority(); - auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager); - if (!indexChanges) { + auto indexChangesList = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager); + + if (indexChangesList.empty()) { LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID()); return; } - auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get()); - compaction.SetActivityFlag(GetTabletActivity()); - compaction.SetQueueGuard(guard); - compaction.Start(*this); + for (const auto& indexChanges : indexChangesList) { + auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get()); + compaction.SetActivityFlag(GetTabletActivity()); + compaction.SetQueueGuard(guard); + compaction.Start(*this); - auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); - static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures = - NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit); - auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); - NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo, - NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard); - auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); - NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), - std::make_shared<TCompactionExecutor>( - TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl), - env, NConveyorComposite::ESpecialTaskCategory::Compaction); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); + static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); + NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo, + NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard); + auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); + NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), + std::make_shared<TCompactionExecutor>( + TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl), + env, NConveyorComposite::ESpecialTaskCategory::Compaction); + } } class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index a0286caac8b..97293278806 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -148,7 +148,7 @@ public: virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0; virtual std::vector<std::shared_ptr<TPortionInfo>> Select( TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const = 0; - virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0; + virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0; virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds, const std::optional<ui64> waitingPriority) const noexcept = 0; virtual std::shared_ptr<TCleanupPortionsColumnEngineChanges> StartCleanupPortions(const TSnapshot& snapshot, diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index fc3402e9ff7..b4bb20507c0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -211,17 +211,17 @@ ui64 TColumnEngineForLogs::GetCompactionPriority(const std::shared_ptr<NDataLock } } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction( +std::vector<std::shared_ptr<TColumnEngineChanges>> TColumnEngineForLogs::StartCompaction( const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept { AFL_VERIFY(dataLocksManager); auto granule = GranulesStorage->GetGranuleForCompaction(dataLocksManager); if (!granule) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction"); - return nullptr; + return {}; } granule->OnStartCompaction(); - auto changes = granule->GetOptimizationTask(granule, dataLocksManager); - if (!changes) { + auto changes = granule->GetOptimizationTasks(granule, dataLocksManager); + if (changes.empty()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")( "weight", granule->GetCompactionPriority().DebugString()); } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 8454612d502..ce1916a9558 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -151,7 +151,7 @@ public: } ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds, const std::optional<ui64> waitingPriority) const noexcept override; - std::shared_ptr<TColumnEngineChanges> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override; + std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override; std::shared_ptr<TCleanupPortionsColumnEngineChanges> StartCleanupPortions(const TSnapshot& snapshot, const THashSet<TInternalPathId>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override; std::shared_ptr<TCleanupTablesColumnEngineChanges> StartCleanupTables(const THashSet<TInternalPathId>& pathsToDrop) noexcept override; diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h index ada8fd426a3..5ec337a8a4d 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h @@ -281,9 +281,9 @@ public: void BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const; - std::shared_ptr<TColumnEngineChanges> GetOptimizationTask( + std::vector<std::shared_ptr<TColumnEngineChanges>> GetOptimizationTasks( std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const { - return OptimizerPlanner->GetOptimizationTask(self, locksManager); + return OptimizerPlanner->GetOptimizationTasks(self, locksManager); } const NGranule::NPortionsIndex::TPortionsIndex& GetPortionsIndex() const { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp index 5054abba27a..51a065e0d60 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp @@ -4,9 +4,9 @@ namespace NKikimr::NOlap::NStorageOptimizer { -std::shared_ptr<TColumnEngineChanges> IOptimizerPlanner::GetOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const { +std::vector<std::shared_ptr<TColumnEngineChanges>> IOptimizerPlanner::GetOptimizationTasks(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const { NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("path_id", PathId)); - return DoGetOptimizationTask(granule, dataLocksManager); + return DoGetOptimizationTasks(granule, dataLocksManager); } IOptimizerPlanner::TModificationGuard& IOptimizerPlanner::TModificationGuard::AddPortion(const std::shared_ptr<TPortionInfo>& portion) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index 380bcf62712..29f3ae345fc 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -113,7 +113,7 @@ private: protected: virtual void DoModifyPortions( const THashMap<ui64, std::shared_ptr<TPortionInfo>>& add, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& remove) = 0; - virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask( + virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const = 0; virtual TOptimizationPriority DoGetUsefulMetric() const = 0; virtual void DoActualize(const TInstant currentInstant) = 0; @@ -251,7 +251,7 @@ public: DoModifyPortions(add, remove); } - std::shared_ptr<TColumnEngineChanges> GetOptimizationTask( + std::vector<std::shared_ptr<TColumnEngineChanges>> GetOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const; TOptimizationPriority GetUsefulMetric() const { auto result = DoGetUsefulMetric(); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h index 2cea48e9991..f291deadac1 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h @@ -843,7 +843,7 @@ public: } } - std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(std::shared_ptr<TGranuleMeta> granule, + std::vector<std::shared_ptr<TColumnEngineChanges>> BuildOptimizationTasks(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager, const NArrow::TSimpleRow* nextBorder, const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::shared_ptr<IStoragesManager>& storagesManager) const { auto youngestPortion = GetYoungestPortion(nextBorder); @@ -894,7 +894,7 @@ public: size += i->GetTotalBlobBytes(); if (locksManager->IsLocked(*i, NDataLocks::ELockCategory::Compaction)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("info", Others.DebugString())("event", "skip_optimization")("reason", "busy"); - return nullptr; + return {}; } } AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stop_instant", stopInstant)("size", size)("next", @@ -916,7 +916,7 @@ public: NArrow::NMerger::TSortableBatchPosition pos(stopPoint->ToBatch(), 0, primaryKeysSchema->field_names(), {}, false); result->AddCheckPoint(pos, false); } - return result; + return { result }; } void AddOther(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) { @@ -1135,29 +1135,29 @@ public: } } - std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask( + std::vector<std::shared_ptr<TColumnEngineChanges>> BuildOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const { AFL_VERIFY(BucketsByWeight.size()); if (!BucketsByWeight.begin()->first) { - return nullptr; + return {}; } AFL_VERIFY(BucketsByWeight.begin()->second.size()); const TPortionsBucket* bucketForOptimization = *BucketsByWeight.begin()->second.begin(); if (bucketForOptimization == LeftBucket.get()) { if (Buckets.size()) { - return bucketForOptimization->BuildOptimizationTask( + return bucketForOptimization->BuildOptimizationTasks( granule, locksManager, &Buckets.begin()->first, PrimaryKeysSchema, StoragesManager); } else { - return bucketForOptimization->BuildOptimizationTask(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager); + return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager); } } else { auto it = Buckets.find(bucketForOptimization->GetPortion()->IndexKeyStart()); AFL_VERIFY(it != Buckets.end()); ++it; if (it != Buckets.end()) { - return bucketForOptimization->BuildOptimizationTask(granule, locksManager, &it->first, PrimaryKeysSchema, StoragesManager); + return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, &it->first, PrimaryKeysSchema, StoragesManager); } else { - return bucketForOptimization->BuildOptimizationTask(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager); + return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager); } } } @@ -1252,9 +1252,9 @@ protected: Buckets.AddPortion(i, now); } } - virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask( + virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override { - return Buckets.BuildOptimizationTask(granule, locksManager); + return Buckets.BuildOptimizationTasks(granule, locksManager); } virtual void DoActualize(const TInstant currentInstant) override { Buckets.Actualize(currentInstant); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h index 898b8587f09..797737ace1b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h @@ -332,7 +332,7 @@ private: virtual ui64 DoGetWeight(bool highPriority) const = 0; virtual TInstant DoGetWeightExpirationInstant() const = 0; virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& pkSchema) const = 0; - virtual TCompactionTaskData DoGetOptimizationTask() const = 0; + virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const = 0; virtual std::optional<TPortionsChain> DoGetAffectedPortions(const NArrow::TSimpleRow& from, const NArrow::TSimpleRow& to) const = 0; virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& from, const NArrow::TSimpleRow& to) const = 0; @@ -503,10 +503,13 @@ public: return DoGetBucketPositions(pkSchema); } - TCompactionTaskData GetOptimizationTask() const { + std::vector<TCompactionTaskData> GetOptimizationTasks() const { AFL_VERIFY(NextLevel); - TCompactionTaskData result = DoGetOptimizationTask(); - AFL_VERIFY(!result.IsEmpty()); + std::vector<TCompactionTaskData> result = DoGetOptimizationTasks(); + AFL_VERIFY(!result.empty()); + for (const auto& compactionTaskData: result) { + AFL_VERIFY(!compactionTaskData.IsEmpty()); + } return result; } }; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp index d81c328e2b6..745b4e6d666 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp @@ -50,7 +50,7 @@ std::vector<TPortionInfo::TPtr> TOneLayerPortions::DoModifyPortions( return problems; } -TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const { +std::vector<TCompactionTaskData> TOneLayerPortions::DoGetOptimizationTasks() const { AFL_VERIFY(GetNextLevel()); ui64 compactedData = 0; TCompactionTaskData result(GetNextLevel()->GetLevelId()); @@ -76,7 +76,7 @@ TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const { --itBkwd; } } - return result; + return { result }; } } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h index 7fceec632ed..41f514bc049 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h @@ -146,7 +146,7 @@ public: virtual std::vector<TPortionInfo::TPtr> DoModifyPortions( const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override; - virtual TCompactionTaskData DoGetOptimizationTask() const override; + virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const override; virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override { NArrow::NMerger::TIntervalPositions result; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp index bb433c9cab7..8c2bc9134c2 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp @@ -2,7 +2,7 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { -TCompactionTaskData TZeroLevelPortions::DoGetOptimizationTask() const { +std::vector<TCompactionTaskData> TZeroLevelPortions::DoGetOptimizationTasks() const { AFL_VERIFY(Portions.size()); TCompactionTaskData result(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>()); for (auto&& i : Portions) { @@ -19,7 +19,7 @@ TCompactionTaskData TZeroLevelPortions::DoGetOptimizationTask() const { } else { PredOptimization = std::nullopt; } - return result; + return { result }; } ui64 TZeroLevelPortions::DoGetWeight(bool highPriority) const { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h index e043b01e428..092f1424d93 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h @@ -73,7 +73,7 @@ private: virtual ui64 DoGetWeight(bool highPriority) const override; virtual TInstant DoGetWeightExpirationInstant() const override; - virtual TCompactionTaskData DoGetOptimizationTask() const override; + virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const override; virtual ui64 GetExpectedPortionSize() const override { return ExpectedBlobsSize; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index 921c4ddab58..708292719bc 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -24,31 +24,49 @@ TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::sh RefreshWeights(); } -std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask( +std::vector<std::shared_ptr<TColumnEngineChanges>> TOptimizerPlanner::DoGetOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const { AFL_VERIFY(LevelsByWeight.size()); - auto level = LevelsByWeight.begin()->second; - auto data = level->GetOptimizationTask(); + TSaverContext saverContext(StoragesManager); - std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result; - // if (level->GetLevelId() == 0) { - result = - std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(granule, data.GetRepackPortions(level->GetLevelId()), saverContext); - // } else { - // result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>( - // granule, data.GetRepackPortions(level->GetLevelId()), saverContext); - // result->AddMovePortions(data.GetMovePortions()); - // } - result->SetTargetCompactionLevel(data.GetTargetCompactionLevel()); - result->SetPortionExpectedSize(Levels[data.GetTargetCompactionLevel()]->GetExpectedPortionSize()); - auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())( - "level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString()); - result->SetCheckPoints(std::move(positions)); - for (auto&& i : result->GetSwitchedPortions()) { - AFL_VERIFY(!locksManager->IsLocked(i, NDataLocks::ELockCategory::Compaction)); + std::vector<std::shared_ptr<TColumnEngineChanges>> results; + for (const auto& [weight, level]: LevelsByWeight) { + if (weight == 0) { + break; + } + auto tasks = level->GetOptimizationTasks(); + for (auto& data: tasks) { + if (data.IsEmpty()) { + continue; + } + if (!results.empty() && (dynamic_pointer_cast<TOneLayerPortions>(Levels[data.GetTargetCompactionLevel()]) || dynamic_pointer_cast<TOneLayerPortions>(level))) { + return results; + } + std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result; + // if (level->GetLevelId() == 0) { + result = + std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(granule, data.GetRepackPortions(level->GetLevelId()), saverContext); + // } else { + // result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>( + // granule, data.GetRepackPortions(level->GetLevelId()), saverContext); + // result->AddMovePortions(data.GetMovePortions()); + // } + result->SetTargetCompactionLevel(data.GetTargetCompactionLevel()); + result->SetPortionExpectedSize(Levels[data.GetTargetCompactionLevel()]->GetExpectedPortionSize()); + auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())( + "level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString()); + result->SetCheckPoints(std::move(positions)); + for (auto&& i : result->GetSwitchedPortions()) { + AFL_VERIFY(!locksManager->IsLocked(i, NDataLocks::ELockCategory::Compaction)); + } + results.push_back(result); + if (!AppDataVerified().ColumnShardConfig.GetEnableParallelCompaction()) { + return results; + } + } } - return result; + return results; } } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h index 5f1091a8475..3b7ed60dfa8 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h @@ -106,7 +106,7 @@ protected: } RefreshWeights(); } - virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask( + virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks( std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override; virtual void DoActualize(const TInstant currentInstant) override { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp index 31faad0b0da..be07ff55ca2 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp @@ -640,24 +640,24 @@ private: return result; } - std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override { + std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override { // Check compactions, top to bottom for (size_t level = 0; level < Max(Accumulator.size(), Levels.size()); ++level) { if (level < Accumulator.size()) { if (auto result = GetCompactAccumulatorTask(granule, locksManager, level)) { - return result; + return { result }; } } if (level < Levels.size()) { if (auto result = GetCompactLevelTask(granule, locksManager, level)) { - return result; + return { result }; } } } // Nothing to compact AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("message", "tiling compaction: nothing to compact"); - return nullptr; + return {}; } void DoActualize(const TInstant currentInstant) override { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp index a329234bf93..02ac00e5146 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp @@ -40,8 +40,8 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) { planner.AddPortion(maker.Make(1, 100, 10000)); Cerr << planner.GetDescription() << Endl; NKikimr::NOlap::TCompactionLimits limits; - auto task = planner.GetOptimizationTask(limits, nullptr); - Y_ABORT_UNLESS(!task); + auto tasks = planner.GetOptimizationTasks(limits, nullptr); + Y_ABORT_UNLESS(!tasks.empty()); } Y_UNIT_TEST(MergeSmall) { @@ -51,9 +51,9 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) { planner.AddPortion(maker.Make(101, 200, 10000)); Cerr << planner.GetDescription() << Endl; NKikimr::NOlap::TCompactionLimits limits; - auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr)); - Y_ABORT_UNLESS(task); - Y_ABORT_UNLESS(task->SwitchedPortions.size() == 2); + auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr)); + Y_ABORT_UNLESS(!tasks.empty()); + Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 2); } Y_UNIT_TEST(MergeSmall1) { @@ -65,9 +65,9 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) { planner.AddPortion(maker.Make(10, 20, 40000)); Cerr << planner.GetDescription() << Endl; NKikimr::NOlap::TCompactionLimits limits; - auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr)); - Y_ABORT_UNLESS(task); - Y_ABORT_UNLESS(task->SwitchedPortions.size() == 4); + auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr)); + Y_ABORT_UNLESS(!tasks.empty()); + Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 4); } Y_UNIT_TEST(MergeSmall2) { @@ -80,11 +80,11 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) { planner.AddPortion(maker.Make(1000, 2000, 40000)); Cerr << planner.GetDescription() << Endl; NKikimr::NOlap::TCompactionLimits limits; - auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr)); - Y_ABORT_UNLESS(task); - Y_ABORT_UNLESS(task->SwitchedPortions.size() == 2); - Y_ABORT_UNLESS(task->SwitchedPortions[0].GetPortionId() == 1); - Y_ABORT_UNLESS(task->SwitchedPortions[1].GetPortionId() == 2); + auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr)); + Y_ABORT_UNLESS(!tasks.empty()); + Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 2); + Y_ABORT_UNLESS(tasks.front()->SwitchedPortions[0].GetPortionId() == 1); + Y_ABORT_UNLESS(tasks.front()->SwitchedPortions[1].GetPortionId() == 2); } }; |
