diff options
| author | Oleg Doronin <[email protected]> | 2025-10-24 11:57:44 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-24 12:57:44 +0300 |
| commit | 106129cea0eec233357df0f22a65d2b8ff3068d1 (patch) | |
| tree | 191f8f96c69f8104342246e8f9ecca95f15cf33a | |
| parent | afd8d5d7d72596be32f1851b79ad50cd5b751ba5 (diff) | |
concurrency for zero level (#27517)
6 files changed, 51 insertions, 17 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 33acc8bc3fa..fc045eb78f0 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -540,6 +540,7 @@ message TCompactionLevelConstructorContainer { optional uint64 PortionsCountAvailable = 3; optional uint64 PortionsCountLimit = 4; optional uint64 PortionsSizeLimit = 5; + optional uint64 Concurrency = 6; } message TOneLayer { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp index 62639e9f027..ba70053034f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp @@ -65,14 +65,14 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do 1, levels.back(), counters->GetLevelCounters(1), std::make_shared<TLimitsOverloadChecker>(1ull << 20, 2048 * (1ull << 30)), TDuration::Max(), 2 * (1ull << 20), 1, - selectors, defaultSelectorName, ui64(1) << 63 + selectors, defaultSelectorName, 1, ui64(1) << 63 )); levels.emplace_back(std::make_shared<TZeroLevelPortions>( 0, levels.back(), counters->GetLevelCounters(0), std::make_shared<TLimitsOverloadChecker>(1ull << 20, 16 * (1ull << 30)), TDuration::Minutes(1), 1ull << 20, 2, - selectors, defaultSelectorName, ui64(1) << 63, true /* compactAtLevel */ + selectors, defaultSelectorName, 1, ui64(1) << 63, true /* compactAtLevel */ )); break; @@ -81,13 +81,13 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do 1, nullptr, counters->GetLevelCounters(1), std::make_shared<TNoOverloadChecker>(), TDuration::Max(), 8 << 20, 1, - selectors, defaultSelectorName + selectors, defaultSelectorName, 1 )); levels.emplace_back(std::make_shared<TZeroLevelPortions>( 0, levels.back(), counters->GetLevelCounters(0), std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)), TDuration::Max(), 4 << 20, 1, - selectors, defaultSelectorName + selectors, defaultSelectorName, 1 )); break; @@ -96,19 +96,19 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do 2, nullptr, counters->GetLevelCounters(2), std::make_shared<TNoOverloadChecker>(), TDuration::Max(), 8 * (1ull << 20), 1, - selectors, defaultSelectorName + selectors, defaultSelectorName, 1 )); levels.emplace_back(std::make_shared<TZeroLevelPortions>( 1, levels.back(), counters->GetLevelCounters(1), std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)), TDuration::Max(), 4 * (1ull << 20), 1, - selectors, defaultSelectorName + selectors, defaultSelectorName, 1 )); levels.emplace_back(std::make_shared<TZeroLevelPortions>( 0, levels.back(), counters->GetLevelCounters(0), std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)), TDuration::Seconds(180), 2 * (1ull << 20), 1, - selectors, defaultSelectorName + selectors, defaultSelectorName, 1 )); break; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp index 17b79af3876..ede29c2485f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp @@ -48,6 +48,14 @@ TConclusionStatus TZeroLevelConstructor::DoDeserializeFromJson(const NJson::TJso } PortionsSizeLimit = jsonValue.GetUInteger(); } + + if (json.Has("concurrency")) { + const auto& jsonValue = json["concurrency"]; + if (!jsonValue.IsUInteger() || jsonValue.GetUInteger() == 0) { + return TConclusionStatus::Fail("incorrect concurrency value (have to be positive unsigned int)"); + } + Concurrency = jsonValue.GetUInteger(); + } return TConclusionStatus::Success(); } @@ -71,6 +79,9 @@ bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompa if (pLevel.HasPortionsSizeLimit()) { PortionsSizeLimit = pLevel.GetPortionsSizeLimit(); } + if (pLevel.HasConcurrency()) { + Concurrency = pLevel.GetConcurrency(); + } return true; } @@ -91,6 +102,9 @@ void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevel if (PortionsSizeLimit) { mLevel.SetPortionsSizeLimit(*PortionsSizeLimit); } + if (Concurrency) { + mLevel.SetConcurrency(*Concurrency); + } } std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel( @@ -99,7 +113,7 @@ std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> T return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters, std::make_shared<TLimitsOverloadChecker>(PortionsCountLimit.value_or(1000000), PortionsSizeLimit), PortionsLiveDuration.value_or(TDuration::Max()), ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10), - selectors, GetDefaultSelectorName()); + selectors, GetDefaultSelectorName(), Concurrency.value_or(1)); } } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h index 4fb8cda439b..436f960b427 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h @@ -15,6 +15,7 @@ private: std::optional<ui64> PortionsCountAvailable; std::optional<ui64> PortionsCountLimit; std::optional<ui64> PortionsSizeLimit; + std::optional<ui64> Concurrency; virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters, diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp index 8c2bc9134c2..731a7e75157 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp @@ -3,23 +3,37 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { std::vector<TCompactionTaskData> TZeroLevelPortions::DoGetOptimizationTasks() const { + std::vector<TCompactionTaskData> result; AFL_VERIFY(Portions.size()); - TCompactionTaskData result(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>()); + result.emplace_back(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>()); + i64 tasksLeft = GetMaxConcurrency(); for (auto&& i : Portions) { - result.AddCurrentLevelPortion( + result.back().AddCurrentLevelPortion( i.GetPortion(), NextLevel->GetAffectedPortions(i.GetPortion()->IndexKeyStart(), i.GetPortion()->IndexKeyEnd()), true); - if (!result.CanTakeMore()) { + if (!result.back().CanTakeMore()) { // result.SetStopSeparation(i.GetPortion()->IndexKeyStart()); - break; + if (--tasksLeft <= 0) { + break; + } + result.emplace_back(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>()); } } + + if (result.back().IsEmpty()) { + result.pop_back(); + } + AFL_VERIFY(!result.empty()); - if (result.CanTakeMore()) { + if (result.back().CanTakeMore()) { PredOptimization = TInstant::Now(); } else { PredOptimization = std::nullopt; } - return { result }; + return result; +} + +ui64 TZeroLevelPortions::GetMaxConcurrency() const { + return std::clamp(ui64(GetPortionsInfo().PredictPackedBlobBytes(GetPackKff()) / std::max(NextLevel->GetExpectedPortionSize(), GetExpectedPortionSize())), ui64(1), Concurrency); } ui64 TZeroLevelPortions::DoGetWeight(bool highPriority) const { @@ -75,13 +89,14 @@ TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const { TZeroLevelPortions::TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, - const TString& defaultSelectorName, const ui64 highPriorityContribution, bool compactAtLevel) + const TString& defaultSelectorName, const ui64 concurrency, const ui64 highPriorityContribution, bool compactAtLevel) : TBase(levelIdx, nextLevel, overloadChecker, levelCounters, selectors, defaultSelectorName) , DurationToDrop(durationToDrop) , ExpectedBlobsSize(expectedBlobsSize) , PortionsCountAvailable(portionsCountAvailable) , HighPriorityContribution(highPriorityContribution) - , CompactAtLevel(compactAtLevel) { + , CompactAtLevel(compactAtLevel) + , Concurrency(concurrency) { if (DurationToDrop != TDuration::Max() && PredOptimization) { *PredOptimization -= TDuration::Seconds(RandomNumber<ui32>(DurationToDrop.Seconds())); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h index 092f1424d93..07e3a8482e2 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h @@ -12,6 +12,7 @@ private: const ui64 PortionsCountAvailable; const ui64 HighPriorityContribution; const bool CompactAtLevel; + const ui64 Concurrency; std::set<TOrderedPortion> Portions; @@ -77,12 +78,14 @@ private: virtual ui64 GetExpectedPortionSize() const override { return ExpectedBlobsSize; } + + ui64 GetMaxConcurrency() const; public: TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName, - const ui64 highPriorityContribution = 0, bool compactAtLevel = false); + const ui64 concurrency, const ui64 highPriorityContribution = 0, bool compactAtLevel = false); }; } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets |
