summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Doronin <[email protected]>2025-10-24 11:57:44 +0200
committerGitHub <[email protected]>2025-10-24 12:57:44 +0300
commit106129cea0eec233357df0f22a65d2b8ff3068d1 (patch)
tree191f8f96c69f8104342246e8f9ecca95f15cf33a
parentafd8d5d7d72596be32f1851b79ad50cd5b751ba5 (diff)
concurrency for zero level (#27517)
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h5
6 files changed, 51 insertions, 17 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 33acc8bc3fa..fc045eb78f0 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -540,6 +540,7 @@ message TCompactionLevelConstructorContainer {
optional uint64 PortionsCountAvailable = 3;
optional uint64 PortionsCountLimit = 4;
optional uint64 PortionsSizeLimit = 5;
+ optional uint64 Concurrency = 6;
}
message TOneLayer {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
index 62639e9f027..ba70053034f 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
@@ -65,14 +65,14 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do
1, levels.back(), counters->GetLevelCounters(1),
std::make_shared<TLimitsOverloadChecker>(1ull << 20, 2048 * (1ull << 30)),
TDuration::Max(), 2 * (1ull << 20), 1,
- selectors, defaultSelectorName, ui64(1) << 63
+ selectors, defaultSelectorName, 1, ui64(1) << 63
));
levels.emplace_back(std::make_shared<TZeroLevelPortions>(
0, levels.back(), counters->GetLevelCounters(0),
std::make_shared<TLimitsOverloadChecker>(1ull << 20, 16 * (1ull << 30)),
TDuration::Minutes(1), 1ull << 20, 2,
- selectors, defaultSelectorName, ui64(1) << 63, true /* compactAtLevel */
+ selectors, defaultSelectorName, 1, ui64(1) << 63, true /* compactAtLevel */
));
break;
@@ -81,13 +81,13 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do
1, nullptr, counters->GetLevelCounters(1),
std::make_shared<TNoOverloadChecker>(),
TDuration::Max(), 8 << 20, 1,
- selectors, defaultSelectorName
+ selectors, defaultSelectorName, 1
));
levels.emplace_back(std::make_shared<TZeroLevelPortions>(
0, levels.back(), counters->GetLevelCounters(0),
std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)),
TDuration::Max(), 4 << 20, 1,
- selectors, defaultSelectorName
+ selectors, defaultSelectorName, 1
));
break;
@@ -96,19 +96,19 @@ TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::Do
2, nullptr, counters->GetLevelCounters(2),
std::make_shared<TNoOverloadChecker>(),
TDuration::Max(), 8 * (1ull << 20), 1,
- selectors, defaultSelectorName
+ selectors, defaultSelectorName, 1
));
levels.emplace_back(std::make_shared<TZeroLevelPortions>(
1, levels.back(), counters->GetLevelCounters(1),
std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)),
TDuration::Max(), 4 * (1ull << 20), 1,
- selectors, defaultSelectorName
+ selectors, defaultSelectorName, 1
));
levels.emplace_back(std::make_shared<TZeroLevelPortions>(
0, levels.back(), counters->GetLevelCounters(0),
std::make_shared<TLimitsOverloadChecker>(1'000'000, 8 * (1ull << 30)),
TDuration::Seconds(180), 2 * (1ull << 20), 1,
- selectors, defaultSelectorName
+ selectors, defaultSelectorName, 1
));
break;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp
index 17b79af3876..ede29c2485f 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp
@@ -48,6 +48,14 @@ TConclusionStatus TZeroLevelConstructor::DoDeserializeFromJson(const NJson::TJso
}
PortionsSizeLimit = jsonValue.GetUInteger();
}
+
+ if (json.Has("concurrency")) {
+ const auto& jsonValue = json["concurrency"];
+ if (!jsonValue.IsUInteger() || jsonValue.GetUInteger() == 0) {
+ return TConclusionStatus::Fail("incorrect concurrency value (have to be positive unsigned int)");
+ }
+ Concurrency = jsonValue.GetUInteger();
+ }
return TConclusionStatus::Success();
}
@@ -71,6 +79,9 @@ bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompa
if (pLevel.HasPortionsSizeLimit()) {
PortionsSizeLimit = pLevel.GetPortionsSizeLimit();
}
+ if (pLevel.HasConcurrency()) {
+ Concurrency = pLevel.GetConcurrency();
+ }
return true;
}
@@ -91,6 +102,9 @@ void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevel
if (PortionsSizeLimit) {
mLevel.SetPortionsSizeLimit(*PortionsSizeLimit);
}
+ if (Concurrency) {
+ mLevel.SetConcurrency(*Concurrency);
+ }
}
std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel(
@@ -99,7 +113,7 @@ std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> T
return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters,
std::make_shared<TLimitsOverloadChecker>(PortionsCountLimit.value_or(1000000), PortionsSizeLimit),
PortionsLiveDuration.value_or(TDuration::Max()), ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10),
- selectors, GetDefaultSelectorName());
+ selectors, GetDefaultSelectorName(), Concurrency.value_or(1));
}
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h
index 4fb8cda439b..436f960b427 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h
@@ -15,6 +15,7 @@ private:
std::optional<ui64> PortionsCountAvailable;
std::optional<ui64> PortionsCountLimit;
std::optional<ui64> PortionsSizeLimit;
+ std::optional<ui64> Concurrency;
virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters,
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
index 8c2bc9134c2..731a7e75157 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
@@ -3,23 +3,37 @@
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
std::vector<TCompactionTaskData> TZeroLevelPortions::DoGetOptimizationTasks() const {
+ std::vector<TCompactionTaskData> result;
AFL_VERIFY(Portions.size());
- TCompactionTaskData result(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>());
+ result.emplace_back(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>());
+ i64 tasksLeft = GetMaxConcurrency();
for (auto&& i : Portions) {
- result.AddCurrentLevelPortion(
+ result.back().AddCurrentLevelPortion(
i.GetPortion(), NextLevel->GetAffectedPortions(i.GetPortion()->IndexKeyStart(), i.GetPortion()->IndexKeyEnd()), true);
- if (!result.CanTakeMore()) {
+ if (!result.back().CanTakeMore()) {
// result.SetStopSeparation(i.GetPortion()->IndexKeyStart());
- break;
+ if (--tasksLeft <= 0) {
+ break;
+ }
+ result.emplace_back(NextLevel->GetLevelId(), CompactAtLevel ? NextLevel->GetExpectedPortionSize() : std::optional<ui64>());
}
}
+
+ if (result.back().IsEmpty()) {
+ result.pop_back();
+ }
+ AFL_VERIFY(!result.empty());
- if (result.CanTakeMore()) {
+ if (result.back().CanTakeMore()) {
PredOptimization = TInstant::Now();
} else {
PredOptimization = std::nullopt;
}
- return { result };
+ return result;
+}
+
+ui64 TZeroLevelPortions::GetMaxConcurrency() const {
+ return std::clamp(ui64(GetPortionsInfo().PredictPackedBlobBytes(GetPackKff()) / std::max(NextLevel->GetExpectedPortionSize(), GetExpectedPortionSize())), ui64(1), Concurrency);
}
ui64 TZeroLevelPortions::DoGetWeight(bool highPriority) const {
@@ -75,13 +89,14 @@ TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const {
TZeroLevelPortions::TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel,
const TLevelCounters& levelCounters, const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop,
const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors,
- const TString& defaultSelectorName, const ui64 highPriorityContribution, bool compactAtLevel)
+ const TString& defaultSelectorName, const ui64 concurrency, const ui64 highPriorityContribution, bool compactAtLevel)
: TBase(levelIdx, nextLevel, overloadChecker, levelCounters, selectors, defaultSelectorName)
, DurationToDrop(durationToDrop)
, ExpectedBlobsSize(expectedBlobsSize)
, PortionsCountAvailable(portionsCountAvailable)
, HighPriorityContribution(highPriorityContribution)
- , CompactAtLevel(compactAtLevel) {
+ , CompactAtLevel(compactAtLevel)
+ , Concurrency(concurrency) {
if (DurationToDrop != TDuration::Max() && PredOptimization) {
*PredOptimization -= TDuration::Seconds(RandomNumber<ui32>(DurationToDrop.Seconds()));
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
index 092f1424d93..07e3a8482e2 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
@@ -12,6 +12,7 @@ private:
const ui64 PortionsCountAvailable;
const ui64 HighPriorityContribution;
const bool CompactAtLevel;
+ const ui64 Concurrency;
std::set<TOrderedPortion> Portions;
@@ -77,12 +78,14 @@ private:
virtual ui64 GetExpectedPortionSize() const override {
return ExpectedBlobsSize;
}
+
+ ui64 GetMaxConcurrency() const;
public:
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters,
const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize,
const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName,
- const ui64 highPriorityContribution = 0, bool compactAtLevel = false);
+ const ui64 concurrency, const ui64 highPriorityContribution = 0, bool compactAtLevel = false);
};
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets