summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Doronin <[email protected]>2025-10-21 22:43:03 +0200
committerGitHub <[email protected]>2025-10-21 20:43:03 +0000
commit07aeccc41b43c9fc0b7da7680340fbac01b81427 (patch)
treeea8613a88ccdbe8e2458de8aedce91990f76a796
parent1b2c55d2050b2a796436b4f0a6eb938934589f10 (diff)
parallel compaction has been supported (#27208)
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp37
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.h4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h11
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp60
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp26
18 files changed, 114 insertions, 89 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8156ff57056..8daf055e8a1 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -2345,6 +2345,7 @@ message TColumnShardConfig {
optional uint64 InFlightLocksRangesBytesLimit = 53 [default = 1073741824];
optional bool CombineChunksInResult = 54 [default = true];
optional bool EnableDiagnostics = 55 [default = true];
+ optional bool EnableParallelCompaction = 56 [default = true];
}
message TSchemeShardConfig {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 4a6504183a5..f5d3b3f822e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -623,28 +623,31 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
Counters.GetCSCounters().OnSetupCompaction();
BackgroundController.ResetWaitingPriority();
- auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager);
- if (!indexChanges) {
+ auto indexChangesList = TablesManager.MutablePrimaryIndex().StartCompaction(DataLocksManager);
+
+ if (indexChangesList.empty()) {
LOG_S_DEBUG("Compaction not started: cannot prepare compaction at tablet " << TabletID());
return;
}
- auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
- compaction.SetActivityFlag(GetTabletActivity());
- compaction.SetQueueGuard(guard);
- compaction.Start(*this);
+ for (const auto& indexChanges : indexChangesList) {
+ auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
+ compaction.SetActivityFlag(GetTabletActivity());
+ compaction.SetQueueGuard(guard);
+ compaction.Start(*this);
- auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
- static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
- NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit);
- auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
- NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
- NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
- auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
- NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
- std::make_shared<TCompactionExecutor>(
- TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl),
- env, NConveyorComposite::ESpecialTaskCategory::Compaction);
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
+ static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
+ NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("COMPACTION", NOlap::TGlobalLimits::GeneralCompactionMemoryLimit);
+ auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
+ NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
+ NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
+ auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
+ NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
+ std::make_shared<TCompactionExecutor>(
+ TabletID(), SelfId(), indexChanges, actualIndexInfo, Counters.GetIndexationCounters(), GetLastCompletedTx(), TabletActivityImpl),
+ env, NConveyorComposite::ESpecialTaskCategory::Compaction);
+ }
}
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index a0286caac8b..97293278806 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -148,7 +148,7 @@ public:
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
virtual std::vector<std::shared_ptr<TPortionInfo>> Select(
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const = 0;
- virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
+ virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,
const std::optional<ui64> waitingPriority) const noexcept = 0;
virtual std::shared_ptr<TCleanupPortionsColumnEngineChanges> StartCleanupPortions(const TSnapshot& snapshot,
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index fc3402e9ff7..b4bb20507c0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -211,17 +211,17 @@ ui64 TColumnEngineForLogs::GetCompactionPriority(const std::shared_ptr<NDataLock
}
}
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(
+std::vector<std::shared_ptr<TColumnEngineChanges>> TColumnEngineForLogs::StartCompaction(
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept {
AFL_VERIFY(dataLocksManager);
auto granule = GranulesStorage->GetGranuleForCompaction(dataLocksManager);
if (!granule) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction");
- return nullptr;
+ return {};
}
granule->OnStartCompaction();
- auto changes = granule->GetOptimizationTask(granule, dataLocksManager);
- if (!changes) {
+ auto changes = granule->GetOptimizationTasks(granule, dataLocksManager);
+ if (changes.empty()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")(
"weight", granule->GetCompactionPriority().DebugString());
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 8454612d502..ce1916a9558 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -151,7 +151,7 @@ public:
}
ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,
const std::optional<ui64> waitingPriority) const noexcept override;
- std::shared_ptr<TColumnEngineChanges> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override;
+ std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override;
std::shared_ptr<TCleanupPortionsColumnEngineChanges> StartCleanupPortions(const TSnapshot& snapshot,
const THashSet<TInternalPathId>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept override;
std::shared_ptr<TCleanupTablesColumnEngineChanges> StartCleanupTables(const THashSet<TInternalPathId>& pathsToDrop) noexcept override;
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h
index ada8fd426a3..5ec337a8a4d 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h
@@ -281,9 +281,9 @@ public:
void BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const TDuration actualizationLag) const;
- std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(
+ std::vector<std::shared_ptr<TColumnEngineChanges>> GetOptimizationTasks(
std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
- return OptimizerPlanner->GetOptimizationTask(self, locksManager);
+ return OptimizerPlanner->GetOptimizationTasks(self, locksManager);
}
const NGranule::NPortionsIndex::TPortionsIndex& GetPortionsIndex() const {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
index 5054abba27a..51a065e0d60 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.cpp
@@ -4,9 +4,9 @@
namespace NKikimr::NOlap::NStorageOptimizer {
-std::shared_ptr<TColumnEngineChanges> IOptimizerPlanner::GetOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const {
+std::vector<std::shared_ptr<TColumnEngineChanges>> IOptimizerPlanner::GetOptimizationTasks(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const {
NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("path_id", PathId));
- return DoGetOptimizationTask(granule, dataLocksManager);
+ return DoGetOptimizationTasks(granule, dataLocksManager);
}
IOptimizerPlanner::TModificationGuard& IOptimizerPlanner::TModificationGuard::AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
index 380bcf62712..29f3ae345fc 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
@@ -113,7 +113,7 @@ private:
protected:
virtual void DoModifyPortions(
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& add, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& remove) = 0;
- virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(
+ virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const = 0;
virtual TOptimizationPriority DoGetUsefulMetric() const = 0;
virtual void DoActualize(const TInstant currentInstant) = 0;
@@ -251,7 +251,7 @@ public:
DoModifyPortions(add, remove);
}
- std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(
+ std::vector<std::shared_ptr<TColumnEngineChanges>> GetOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) const;
TOptimizationPriority GetUsefulMetric() const {
auto result = DoGetUsefulMetric();
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h
index 2cea48e9991..f291deadac1 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h
@@ -843,7 +843,7 @@ public:
}
}
- std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(std::shared_ptr<TGranuleMeta> granule,
+ std::vector<std::shared_ptr<TColumnEngineChanges>> BuildOptimizationTasks(std::shared_ptr<TGranuleMeta> granule,
const std::shared_ptr<NDataLocks::TManager>& locksManager, const NArrow::TSimpleRow* nextBorder,
const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::shared_ptr<IStoragesManager>& storagesManager) const {
auto youngestPortion = GetYoungestPortion(nextBorder);
@@ -894,7 +894,7 @@ public:
size += i->GetTotalBlobBytes();
if (locksManager->IsLocked(*i, NDataLocks::ELockCategory::Compaction)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("info", Others.DebugString())("event", "skip_optimization")("reason", "busy");
- return nullptr;
+ return {};
}
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stop_instant", stopInstant)("size", size)("next",
@@ -916,7 +916,7 @@ public:
NArrow::NMerger::TSortableBatchPosition pos(stopPoint->ToBatch(), 0, primaryKeysSchema->field_names(), {}, false);
result->AddCheckPoint(pos, false);
}
- return result;
+ return { result };
}
void AddOther(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) {
@@ -1135,29 +1135,29 @@ public:
}
}
- std::shared_ptr<TColumnEngineChanges> BuildOptimizationTask(
+ std::vector<std::shared_ptr<TColumnEngineChanges>> BuildOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
AFL_VERIFY(BucketsByWeight.size());
if (!BucketsByWeight.begin()->first) {
- return nullptr;
+ return {};
}
AFL_VERIFY(BucketsByWeight.begin()->second.size());
const TPortionsBucket* bucketForOptimization = *BucketsByWeight.begin()->second.begin();
if (bucketForOptimization == LeftBucket.get()) {
if (Buckets.size()) {
- return bucketForOptimization->BuildOptimizationTask(
+ return bucketForOptimization->BuildOptimizationTasks(
granule, locksManager, &Buckets.begin()->first, PrimaryKeysSchema, StoragesManager);
} else {
- return bucketForOptimization->BuildOptimizationTask(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager);
+ return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager);
}
} else {
auto it = Buckets.find(bucketForOptimization->GetPortion()->IndexKeyStart());
AFL_VERIFY(it != Buckets.end());
++it;
if (it != Buckets.end()) {
- return bucketForOptimization->BuildOptimizationTask(granule, locksManager, &it->first, PrimaryKeysSchema, StoragesManager);
+ return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, &it->first, PrimaryKeysSchema, StoragesManager);
} else {
- return bucketForOptimization->BuildOptimizationTask(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager);
+ return bucketForOptimization->BuildOptimizationTasks(granule, locksManager, nullptr, PrimaryKeysSchema, StoragesManager);
}
}
}
@@ -1252,9 +1252,9 @@ protected:
Buckets.AddPortion(i, now);
}
}
- virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(
+ virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {
- return Buckets.BuildOptimizationTask(granule, locksManager);
+ return Buckets.BuildOptimizationTasks(granule, locksManager);
}
virtual void DoActualize(const TInstant currentInstant) override {
Buckets.Actualize(currentInstant);
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h
index 898b8587f09..797737ace1b 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h
@@ -332,7 +332,7 @@ private:
virtual ui64 DoGetWeight(bool highPriority) const = 0;
virtual TInstant DoGetWeightExpirationInstant() const = 0;
virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& pkSchema) const = 0;
- virtual TCompactionTaskData DoGetOptimizationTask() const = 0;
+ virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const = 0;
virtual std::optional<TPortionsChain> DoGetAffectedPortions(const NArrow::TSimpleRow& from, const NArrow::TSimpleRow& to) const = 0;
virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& from, const NArrow::TSimpleRow& to) const = 0;
@@ -503,10 +503,13 @@ public:
return DoGetBucketPositions(pkSchema);
}
- TCompactionTaskData GetOptimizationTask() const {
+ std::vector<TCompactionTaskData> GetOptimizationTasks() const {
AFL_VERIFY(NextLevel);
- TCompactionTaskData result = DoGetOptimizationTask();
- AFL_VERIFY(!result.IsEmpty());
+ std::vector<TCompactionTaskData> result = DoGetOptimizationTasks();
+ AFL_VERIFY(!result.empty());
+ for (const auto& compactionTaskData: result) {
+ AFL_VERIFY(!compactionTaskData.IsEmpty());
+ }
return result;
}
};
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp
index d81c328e2b6..745b4e6d666 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp
@@ -50,7 +50,7 @@ std::vector<TPortionInfo::TPtr> TOneLayerPortions::DoModifyPortions(
return problems;
}
-TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const {
+std::vector<TCompactionTaskData> TOneLayerPortions::DoGetOptimizationTasks() const {
AFL_VERIFY(GetNextLevel());
ui64 compactedData = 0;
TCompactionTaskData result(GetNextLevel()->GetLevelId());
@@ -76,7 +76,7 @@ TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const {
--itBkwd;
}
}
- return result;
+ return { result };
}
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h
index 7fceec632ed..41f514bc049 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h
@@ -146,7 +146,7 @@ public:
virtual std::vector<TPortionInfo::TPtr> DoModifyPortions(
const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override;
- virtual TCompactionTaskData DoGetOptimizationTask() const override;
+ virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const override;
virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override {
NArrow::NMerger::TIntervalPositions result;
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
index bb433c9cab7..8c2bc9134c2 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
@@ -2,7 +2,7 @@
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
-TCompactionTaskData TZeroLevelPortions::DoGetOptimizationTask() const {
+std::vector<TCompactionTaskData> TZeroLevelPortions::DoGetOptimizationTasks() const {
AFL_VERIFY(Portions.size());
TCompactionTaskData result(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>());
for (auto&& i : Portions) {
@@ -19,7 +19,7 @@ TCompactionTaskData TZeroLevelPortions::DoGetOptimizationTask() const {
} else {
PredOptimization = std::nullopt;
}
- return result;
+ return { result };
}
ui64 TZeroLevelPortions::DoGetWeight(bool highPriority) const {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
index e043b01e428..092f1424d93 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
@@ -73,7 +73,7 @@ private:
virtual ui64 DoGetWeight(bool highPriority) const override;
virtual TInstant DoGetWeightExpirationInstant() const override;
- virtual TCompactionTaskData DoGetOptimizationTask() const override;
+ virtual std::vector<TCompactionTaskData> DoGetOptimizationTasks() const override;
virtual ui64 GetExpectedPortionSize() const override {
return ExpectedBlobsSize;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
index 921c4ddab58..708292719bc 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
@@ -24,31 +24,49 @@ TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::sh
RefreshWeights();
}
-std::shared_ptr<TColumnEngineChanges> TOptimizerPlanner::DoGetOptimizationTask(
+std::vector<std::shared_ptr<TColumnEngineChanges>> TOptimizerPlanner::DoGetOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
AFL_VERIFY(LevelsByWeight.size());
- auto level = LevelsByWeight.begin()->second;
- auto data = level->GetOptimizationTask();
+
TSaverContext saverContext(StoragesManager);
- std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result;
- // if (level->GetLevelId() == 0) {
- result =
- std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
- // } else {
- // result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
- // granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
- // result->AddMovePortions(data.GetMovePortions());
- // }
- result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
- result->SetPortionExpectedSize(Levels[data.GetTargetCompactionLevel()]->GetExpectedPortionSize());
- auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
- "level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString());
- result->SetCheckPoints(std::move(positions));
- for (auto&& i : result->GetSwitchedPortions()) {
- AFL_VERIFY(!locksManager->IsLocked(i, NDataLocks::ELockCategory::Compaction));
+ std::vector<std::shared_ptr<TColumnEngineChanges>> results;
+ for (const auto& [weight, level]: LevelsByWeight) {
+ if (weight == 0) {
+ break;
+ }
+ auto tasks = level->GetOptimizationTasks();
+ for (auto& data: tasks) {
+ if (data.IsEmpty()) {
+ continue;
+ }
+ if (!results.empty() && (dynamic_pointer_cast<TOneLayerPortions>(Levels[data.GetTargetCompactionLevel()]) || dynamic_pointer_cast<TOneLayerPortions>(level))) {
+ return results;
+ }
+ std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result;
+ // if (level->GetLevelId() == 0) {
+ result =
+ std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
+ // } else {
+ // result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
+ // granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
+ // result->AddMovePortions(data.GetMovePortions());
+ // }
+ result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
+ result->SetPortionExpectedSize(Levels[data.GetTargetCompactionLevel()]->GetExpectedPortionSize());
+ auto positions = data.GetCheckPositions(PrimaryKeysSchema, level->GetLevelId() > 1);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("task_id", result->GetTaskIdentifier())("positions", positions.DebugString())(
+ "level", level->GetLevelId())("target", data.GetTargetCompactionLevel())("data", data.DebugString());
+ result->SetCheckPoints(std::move(positions));
+ for (auto&& i : result->GetSwitchedPortions()) {
+ AFL_VERIFY(!locksManager->IsLocked(i, NDataLocks::ELockCategory::Compaction));
+ }
+ results.push_back(result);
+ if (!AppDataVerified().ColumnShardConfig.GetEnableParallelCompaction()) {
+ return results;
+ }
+ }
}
- return result;
+ return results;
}
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
index 5f1091a8475..3b7ed60dfa8 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
@@ -106,7 +106,7 @@ protected:
}
RefreshWeights();
}
- virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(
+ virtual std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks(
std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override;
virtual void DoActualize(const TInstant currentInstant) override {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp
index 31faad0b0da..be07ff55ca2 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/tiling/tiling.cpp
@@ -640,24 +640,24 @@ private:
return result;
}
- std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {
+ std::vector<std::shared_ptr<TColumnEngineChanges>> DoGetOptimizationTasks(std::shared_ptr<TGranuleMeta> granule, const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {
// Check compactions, top to bottom
for (size_t level = 0; level < Max(Accumulator.size(), Levels.size()); ++level) {
if (level < Accumulator.size()) {
if (auto result = GetCompactAccumulatorTask(granule, locksManager, level)) {
- return result;
+ return { result };
}
}
if (level < Levels.size()) {
if (auto result = GetCompactLevelTask(granule, locksManager, level)) {
- return result;
+ return { result };
}
}
}
// Nothing to compact
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("message", "tiling compaction: nothing to compact");
- return nullptr;
+ return {};
}
void DoActualize(const TInstant currentInstant) override {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
index a329234bf93..02ac00e5146 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
@@ -40,8 +40,8 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) {
planner.AddPortion(maker.Make(1, 100, 10000));
Cerr << planner.GetDescription() << Endl;
NKikimr::NOlap::TCompactionLimits limits;
- auto task = planner.GetOptimizationTask(limits, nullptr);
- Y_ABORT_UNLESS(!task);
+ auto tasks = planner.GetOptimizationTasks(limits, nullptr);
+ Y_ABORT_UNLESS(!tasks.empty());
}
Y_UNIT_TEST(MergeSmall) {
@@ -51,9 +51,9 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) {
planner.AddPortion(maker.Make(101, 200, 10000));
Cerr << planner.GetDescription() << Endl;
NKikimr::NOlap::TCompactionLimits limits;
- auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
- Y_ABORT_UNLESS(task);
- Y_ABORT_UNLESS(task->SwitchedPortions.size() == 2);
+ auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr));
+ Y_ABORT_UNLESS(!tasks.empty());
+ Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 2);
}
Y_UNIT_TEST(MergeSmall1) {
@@ -65,9 +65,9 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) {
planner.AddPortion(maker.Make(10, 20, 40000));
Cerr << planner.GetDescription() << Endl;
NKikimr::NOlap::TCompactionLimits limits;
- auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
- Y_ABORT_UNLESS(task);
- Y_ABORT_UNLESS(task->SwitchedPortions.size() == 4);
+ auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr));
+ Y_ABORT_UNLESS(!tasks.empty());
+ Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 4);
}
Y_UNIT_TEST(MergeSmall2) {
@@ -80,11 +80,11 @@ Y_UNIT_TEST_SUITE(StorageOptimizer) {
planner.AddPortion(maker.Make(1000, 2000, 40000));
Cerr << planner.GetDescription() << Endl;
NKikimr::NOlap::TCompactionLimits limits;
- auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
- Y_ABORT_UNLESS(task);
- Y_ABORT_UNLESS(task->SwitchedPortions.size() == 2);
- Y_ABORT_UNLESS(task->SwitchedPortions[0].GetPortionId() == 1);
- Y_ABORT_UNLESS(task->SwitchedPortions[1].GetPortionId() == 2);
+ auto tasks = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTasks(limits, nullptr));
+ Y_ABORT_UNLESS(!tasks.empty());
+ Y_ABORT_UNLESS(tasks.front()->SwitchedPortions.size() == 2);
+ Y_ABORT_UNLESS(tasks.front()->SwitchedPortions[0].GetPortionId() == 1);
+ Y_ABORT_UNLESS(tasks.front()->SwitchedPortions[1].GetPortionId() == 2);
}
};