aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-11 20:31:54 +0300
committerAlexey Borzenkov <snaury@yandex-team.ru>2022-02-11 20:31:54 +0300
commit76c9f83980f89b39db53efda3e343c4ddb36491d (patch)
treed92eb9f6adcee736fd6cc029e3c04a68e431d17b
parent05ac286ec4ba733e0b158caa74c576e49b24b23e (diff)
downloadydb-76c9f83980f89b39db53efda3e343c4ddb36491d.tar.gz
Change forced compactions to gradually compact through generations, KIKIMR-14094
ref:00a0a510453b13a58df4070ae60c39e2577f4200
-rw-r--r--ydb/core/tablet_flat/flat_comp_gen.cpp264
-rw-r--r--ydb/core/tablet_flat/flat_comp_gen.h12
-rw-r--r--ydb/core/tablet_flat/ut/flat_comp_ut_common.h6
-rw-r--r--ydb/core/tablet_flat/ut/ut_comp_gen.cpp401
-rw-r--r--ydb/core/tablet_flat/ut/ut_comp_shard.cpp2
5 files changed, 495 insertions, 190 deletions
diff --git a/ydb/core/tablet_flat/flat_comp_gen.cpp b/ydb/core/tablet_flat/flat_comp_gen.cpp
index 43bf95357b..47c57fe192 100644
--- a/ydb/core/tablet_flat/flat_comp_gen.cpp
+++ b/ydb/core/tablet_flat/flat_comp_gen.cpp
@@ -302,13 +302,15 @@ void TGenCompactionStrategy::Stop() {
FinalCompactionId = 0;
FinalCompactionLevel = 0;
FinalCompactionTaken = 0;
- ForcedMemCompactionId = 0;
ForcedState = EForcedState::None;
+ ForcedMemCompactionId = 0;
+ ForcedGeneration = 0;
MaxOverloadFactor = 0.0;
CurrentForcedGenCompactionId = 0;
NextForcedGenCompactionId = 0;
FinishedForcedGenCompactionId = 0;
+ FinishedForcedGenCompactionTs = {};
// Make it possible to Start again
Generations.clear();
@@ -390,25 +392,14 @@ ui64 TGenCompactionStrategy::BeginMemCompaction(TTaskId taskId, TSnapEdge edge,
taskId,
edge,
/* generation */ 0,
- /* full */ false,
extra);
Y_VERIFY(MemCompactionId != 0);
if (forcedCompactionId != 0) {
- if (!Generations.empty()) {
- // We remember the last forced compaction we have started
- ForcedMemCompactionId = MemCompactionId;
- }
-
- switch (ForcedState) {
- case EForcedState::None:
- case EForcedState::Pending:
- CurrentForcedGenCompactionId = forcedCompactionId;
- break;
- case EForcedState::Compacting:
- NextForcedGenCompactionId = forcedCompactionId;
- break;
- }
+ // We remember the last forced mem compaction we have started
+ ForcedMemCompactionId = MemCompactionId;
+ // We also remember the last user-provided forced compaction id
+ NextForcedGenCompactionId = forcedCompactionId;
}
return MemCompactionId;
@@ -467,6 +458,8 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
TCompactionChanges changes;
+ TVector<bool> checkNeeded(Generations.size());
+
auto& sourceParts = params->Parts;
if (compactionId == FinalCompactionId || generation == 255) {
@@ -496,11 +489,12 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
}
sourceParts.erase(partStart, partEnd);
FinalCompactionId = 0;
+ // Compaction policy may have increased the number of generations
+ for (ui32 index : xrange(size_t(generation), Generations.size())) {
+ checkNeeded[index] = true;
+ }
}
- TVector<bool> checkNeeded(Generations.size());
- bool fullCompaction = false;
-
if (generation < Generations.size()) {
auto& nextGen = Generations[generation];
if (nextGen.TakenHeadParts != 0) {
@@ -525,19 +519,20 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
}
}
+ bool forcedCompactionContinue = false;
+
if (generation == 0) {
// This was a memtable compaction, we don't expect anything else
Y_VERIFY(sourceParts.empty());
// Check if we just finished the last forced mem compaction
if (ForcedMemCompactionId && compactionId == ForcedMemCompactionId) {
- if (KnownParts.empty() || Generations.empty()) {
- // All we have are new parts, there's no point in recompacting
- // everything again
- ForcedMemCompactionId = 0;
- } else if (ForcedState == EForcedState::Pending) {
- // Forced compaction hasn't started yet, don't need to requeue
- ForcedMemCompactionId = 0;
+ ForcedMemCompactionId = 0;
+
+ // Continue compaction when we don't have some other compaction running
+ if (ForcedState == EForcedState::None) {
+ CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0);
+ forcedCompactionContinue = true;
}
}
} else if (generation == 255) {
@@ -547,22 +542,19 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
ui32 sourceIndex = generation - 1;
Y_VERIFY(sourceIndex < Generations.size());
- // During full compaction all other generations are idle
- fullCompaction = (ForcedState == EForcedState::Compacting);
+ if (ForcedState == EForcedState::Compacting && ForcedGeneration == generation) {
+ ForcedState = EForcedState::None;
+ forcedCompactionContinue = true;
+ }
while (sourceParts) {
- // Search back for the first generation that has non-empty parts
- while (fullCompaction && sourceIndex > 0 && Generations[sourceIndex].Parts.empty()) {
- --sourceIndex;
- }
-
auto& sourcePart = sourceParts.back();
auto& sourceGen = Generations[sourceIndex];
Y_VERIFY(sourceGen.Parts);
auto& part = sourceGen.Parts.back();
Y_VERIFY(part.Label == sourcePart->Label,
- "Failed at gen=%u, sourceIndex=%u, full=%d, headTaken=%lu",
- generation, sourceIndex, fullCompaction, sourceGen.TakenHeadParts);
+ "Failed at gen=%u, sourceIndex=%u, headTaken=%lu",
+ generation, sourceIndex, sourceGen.TakenHeadParts);
Y_VERIFY(sourceGen.CompactingTailParts > 0);
CachedGarbageBytes -= part.GarbageBytes;
KnownParts.erase(part.Label);
@@ -573,20 +565,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
Y_VERIFY(sourceParts.empty());
- for (ui32 index : xrange(sourceIndex, generation)) {
- Y_VERIFY(Generations[index].CompactingTailParts == 0);
- }
-
- if (fullCompaction) {
- ForcedState = EForcedState::None;
- OnForcedGenCompactionDone();
-
- // Will have to recheck all parent generations
- for (ui32 parent : xrange(1u, generation)) {
- checkNeeded[parent - 1] = true;
- }
- }
-
// Recheck compacted generation, it's free and may need a new compaction
checkNeeded[generation - 1] = true;
}
@@ -601,8 +579,36 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
newStats += TStats(partView);
}
+ // This will be an index where we place results
ui32 target = generation != 255 ? generation : Generations.size();
- while (newParts && target > 0) {
+
+ // After forced compaction we may want to place results as low as possible
+ if (forcedCompactionContinue) {
+ while (target < Generations.size()) {
+ auto& candidate = Generations[target];
+ if (!candidate.Parts.empty()) {
+ // Cannot pass non-empty generations
+ break;
+ }
+ // Try to move to the next generation
+ ++target;
+ }
+ if (target == Generations.size()) {
+ if (generation >= target || (FinalParts.empty() && ColdParts.empty())) {
+ // The forced compaction has finished, uplift logic will kick in below
+ forcedCompactionContinue = false;
+ if (target > generation) {
+ target = generation;
+ }
+ OnForcedGenCompactionDone();
+ } else {
+ // We need to compact final parts, so we need to place results at the last generation
+ --target;
+ }
+ }
+ }
+
+ while (newParts && target > 0 && !forcedCompactionContinue) {
auto& candidate = Generations[target - 1];
if (candidate.CompactingTailParts > 0) {
// Cannot uplift to busy generations
@@ -651,7 +657,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
}
} else {
auto& newGen = Generations[target];
- if (target != generation) {
+ if (target < generation) {
Y_VERIFY(!newGen.Parts || result->Epoch <= newGen.Parts.back().Epoch);
for (auto it = newParts.begin(); it != newParts.end(); ++it) {
auto& partView = *it;
@@ -680,24 +686,34 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
UpdateStats();
+ if (forcedCompactionContinue) {
+ Y_VERIFY(target < Generations.size());
+ ForcedState = EForcedState::Pending;
+ ForcedGeneration = target + 1;
+ checkNeeded[target] = true;
+ }
+
if (Generations) {
- bool needUpdateOverload = false;
+ // Check various conditions for starting a forced compaction
if (ForcedState == EForcedState::None) {
- if (ForcedMemCompactionId && ForcedMemCompactionId != MemCompactionId) {
- // The forced memtable compaction has finished, start gen compaction
- ForcedState = EForcedState::Pending;
- ForcedMemCompactionId = 0;
- needUpdateOverload = true;
+ bool startForcedCompaction = false;
+
+ if (NextForcedGenCompactionId && ForcedMemCompactionId == 0) {
+ // The previous forced compaction has finished, start gen compactions
+ CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0);
+ startForcedCompaction = true;
} else if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
// Table has too many dropped rows, compact everything
- ForcedState = EForcedState::Pending;
- needUpdateOverload = true;
+ startForcedCompaction = true;
} else if (CachedDroppedBytesPercent >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
// Table has too much garbage, compact everything
+ startForcedCompaction = true;
+ }
+
+ if (startForcedCompaction) {
ForcedState = EForcedState::Pending;
- needUpdateOverload = true;
- } else if (CurrentForcedGenCompactionId) {
- OnForcedGenCompactionDone();
+ ForcedGeneration = 1;
+ checkNeeded[0] = true;
}
}
@@ -709,14 +725,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
CheckGeneration(index + 1);
}
}
-
- if (needUpdateOverload && Generations.size() > 1 && !checkNeeded[1]) {
- // in case of forced compaction we must recalculate overload
- // to decrease it and upper reject threshold
- CheckOverload(1);
- }
- } else {
- OnForcedGenCompactionDone();
}
UpdateOverload();
@@ -725,45 +733,10 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
}
void TGenCompactionStrategy::OnForcedGenCompactionDone() {
- if (CurrentForcedGenCompactionId) {
- FinishedForcedGenCompactionId = CurrentForcedGenCompactionId;
- FinishedForcedGenCompactionTs = Time->Now();
- CurrentForcedGenCompactionId = 0;
- }
-
- if (NextForcedGenCompactionId) {
- CurrentForcedGenCompactionId = NextForcedGenCompactionId;
- NextForcedGenCompactionId = 0;
- }
-
- CheckForcedGenCompactionNeeded();
-}
-
-void TGenCompactionStrategy::CheckForcedGenCompactionNeeded() {
- // we already planned compaction
- if (ForcedState != EForcedState::None || ForcedMemCompactionId)
- return;
-
- if (CurrentForcedGenCompactionId <= FinishedForcedGenCompactionId) {
- CurrentForcedGenCompactionId = 0;
- return;
- }
-
- // note that CurrentForcedGenCompactionId != 0, because of check above
- if (!Generations) {
- FinishedForcedGenCompactionId = CurrentForcedGenCompactionId;
- FinishedForcedGenCompactionTs = Time->Now();
- CurrentForcedGenCompactionId = 0;
- return;
- }
-
- ForcedState = EForcedState::Pending;
- CheckGeneration(Generations.size());
- if (Generations.size() > 1) {
- // in case of forced compaction we must recalculate overload
- // to decrease it and upper reject threshold
- CheckOverload(1);
+ if (CurrentForcedGenCompactionId != 0) {
+ FinishedForcedGenCompactionId = std::exchange(CurrentForcedGenCompactionId, 0);
}
+ FinishedForcedGenCompactionTs = Time->Now();
}
void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) {
@@ -865,7 +838,7 @@ void TGenCompactionStrategy::OutputHtml(IOutputStream& out) {
<< ", Epochs count: " << gen.PartEpochCount
<< ", Overload factor: " << gen.OverloadFactor
<< ", Compaction state: " << gen.State; }
- if (generation == Generations.size() && ForcedState != EForcedState::None) {
+ if (ForcedState != EForcedState::None && ForcedGeneration == generation) {
out << ", Forced compaction: " << ForcedState;
}
if (gen.Task.TaskId) {
@@ -964,7 +937,6 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation)
taskId,
/* edge */ { },
generation,
- /* full */ false,
extra);
FinalState.State = EState::Compacting;
return;
@@ -977,7 +949,7 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation)
Y_VERIFY(gen.State == EState::Pending || gen.State == EState::PendingBackground);
Y_VERIFY(gen.Task.TaskId == taskId);
- const bool full = NeedToForceCompact(generation);
+ const bool forced = NeedToForceCompact(generation);
auto cancelCompaction = [&]() {
Broker->FinishTask(taskId, EResourceStatus::Cancelled);
@@ -985,18 +957,7 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation)
gen.State = EState::Free;
- if (full) {
- // We just cancelled a forced compaction
- ForcedState = EForcedState::None;
- CheckForcedGenCompactionNeeded();
-
- // Make sure to kickstart all generations
- for (ui32 index : xrange(Generations.size())) {
- CheckGeneration(index + 1);
- }
-
- UpdateOverload();
- }
+ Y_VERIFY(!forced, "Unexpected cancellation of a forced compaction");
};
if (DesiredMode(generation) == EDesiredMode::None) {
@@ -1005,41 +966,8 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation)
return;
}
- if (full) {
- // We are going to start a forced compaction, cancel everything else
- // except a possible ongoing memtable compaction
- Y_VERIFY(FinalCompactionId == 0, "Full compaction while final compaction is running");
- for (ui32 index : xrange(generation - 1)) {
- auto& parent = Generations[index];
- auto& next = Generations[index + 1];
- switch (std::exchange(parent.State, EState::Free)) {
- case EState::Free:
- break;
- case EState::Pending:
- case EState::PendingBackground:
- // Compaction is scheduled, but not started yet
- Y_VERIFY(parent.CompactingTailParts == 0);
- Y_VERIFY(next.TakenHeadParts == 0);
- Broker->CancelTask(parent.Task.TaskId);
- break;
- case EState::Compacting:
- // Compaction is running right now, but we know it's
- // not a full compaction, so may only use parts between
- // parent and next.
- parent.CompactingTailParts = 0;
- next.TakenHeadParts = 0;
- next.TakenHeadBackingSize = 0;
- next.TakenHeadPartEpochCount = 0;
- Backend->CancelCompaction(parent.Task.CompactionId);
- break;
- }
- parent.Task.TaskId = 0;
- parent.Task.CompactionId = 0;
- }
- }
-
TExtraState extra;
- if (!full && generation < Generations.size()) {
+ if (!forced && generation < Generations.size()) {
extra.InitFromPolicy(Policy->Generations[generation]);
}
@@ -1051,14 +979,14 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation)
taskId,
/* edge */ { },
generation,
- full,
extra);
gen.State = EState::Compacting;
- if (full) {
+ if (forced) {
// We have just started a forced compaction
ForcedState = EForcedState::Compacting;
+ Y_VERIFY(ForcedGeneration == generation);
}
}
@@ -1105,8 +1033,8 @@ ui32 TGenCompactionStrategy::ComputeBackgroundPriority(
Y_VERIFY(generation > 0);
Y_VERIFY(generation <= Generations.size());
- if (generation == Generations.size() && ForcedState == EForcedState::Pending) {
- // This background compaction will be used for the full compaction
+ if (NeedToForceCompact(generation)) {
+ // This background compaction will be used for the forced compaction
// TODO: figure out which priority we should use for such compactions
return Policy->DefaultTaskPriority;
}
@@ -1164,10 +1092,6 @@ void TGenCompactionStrategy::CheckOverload(ui32 generation) {
// TODO: make lo and hi watermarks configurable
float loK = 1.5;
float hiK = 3;
- if (generation == 1 && ShouldIncreaseOverloadWhatermarts()) {
- loK *= 3;
- hiK *= 3;
- }
overloadFactor = Max(overloadFactor, mapToRange(genSize, genPolicy.ForceSizeToCompact*loK, genPolicy.ForceSizeToCompact*hiK));
overloadFactor = Max(overloadFactor, mapToRange(genParts, genPolicy.ForceCountToCompact*loK, genPolicy.ForceCountToCompact*hiK));
gen.OverloadFactor = overloadFactor;
@@ -1231,11 +1155,6 @@ TGenCompactionStrategy::EDesiredMode TGenCompactionStrategy::DesiredMode(ui32 ge
Y_VERIFY(generation > 0);
Y_VERIFY(generation <= Generations.size());
- if (ForcedState == EForcedState::Compacting) {
- // We cannot start new compactions during forced compaction
- return EDesiredMode::None;
- }
-
auto& gen = Generations[generation - 1];
ui64 genSize = gen.Stats.BackingSize - gen.TakenHeadBackingSize;
ui32 genParts = gen.PartEpochCount - gen.TakenHeadPartEpochCount;
@@ -1294,7 +1213,6 @@ ui64 TGenCompactionStrategy::PrepareCompaction(
TTaskId taskId,
TSnapEdge edge,
ui32 generation,
- bool full,
TExtraState& extra)
{
Y_VERIFY(generation <= Generations.size() || generation == 255);
@@ -1311,7 +1229,7 @@ ui64 TGenCompactionStrategy::PrepareCompaction(
if (generation > 0 && generation != 255) {
bool first = true;
- for (ui32 index : xrange(full ? 0u : generation - 1, generation)) {
+ for (ui32 index : xrange(generation - 1, generation)) {
auto& gen = Generations.at(index);
size_t skip = first ? gen.TakenHeadParts : 0;
Y_VERIFY(gen.TakenHeadParts == skip);
@@ -1335,7 +1253,7 @@ ui64 TGenCompactionStrategy::PrepareCompaction(
Y_VERIFY(nextGen.TakenHeadParts == 0);
Y_VERIFY(nextGen.TakenHeadBackingSize == 0);
Y_VERIFY(nextGen.TakenHeadPartEpochCount == 0);
- if (extra.ExtrasAllowed()) {
+ if (extra.ExtrasAllowed() && !NeedToForceCompact(generation + 1)) {
Y_VERIFY(nextGen.Parts.size() >= nextGen.CompactingTailParts);
size_t available = nextGen.Parts.size() - nextGen.CompactingTailParts;
TEpoch lastEpoch = TEpoch::Max();
diff --git a/ydb/core/tablet_flat/flat_comp_gen.h b/ydb/core/tablet_flat/flat_comp_gen.h
index fc38fe705e..79f7f689c6 100644
--- a/ydb/core/tablet_flat/flat_comp_gen.h
+++ b/ydb/core/tablet_flat/flat_comp_gen.h
@@ -213,7 +213,6 @@ namespace NCompGen {
void BeginGenCompaction(TTaskId taskId, ui32 generation);
void OnForcedGenCompactionDone();
- void CheckForcedGenCompactionNeeded();
ui32 ComputeBackgroundPriority(
ui32 generation,
@@ -235,21 +234,13 @@ namespace NCompGen {
TTaskId taskId,
TSnapEdge edge,
ui32 generation,
- bool full,
TExtraState& extra);
void UpdateStats();
void UpdateOverload();
bool NeedToForceCompact(ui32 generation) const {
- return (
- generation == Generations.size() &&
- ForcedState == EForcedState::Pending &&
- ForcedMemCompactionId == 0);
- }
-
- bool ShouldIncreaseOverloadWhatermarts() const {
- return ForcedState != EForcedState::None;
+ return (ForcedState == EForcedState::Pending && ForcedGeneration == generation);
}
private:
@@ -267,6 +258,7 @@ namespace NCompGen {
size_t FinalCompactionTaken = 0;
EForcedState ForcedState = EForcedState::None;
ui64 ForcedMemCompactionId = 0;
+ ui32 ForcedGeneration = 0;
float MaxOverloadFactor = 0.0;
ui64 CurrentForcedGenCompactionId = 0;
diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
index 66702b8c3f..0f6e025d72 100644
--- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
+++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
@@ -222,14 +222,15 @@ public:
RunCompaction(&params);
}
- void SimpleMemCompaction(ICompactionStrategy* strategy, bool forced = false) {
- ui64 forcedCompactionId = forced ? 1 : 0;
+ ui64 SimpleMemCompaction(ICompactionStrategy* strategy, bool forced = false) {
+ ui64 forcedCompactionId = forced ? NextForcedCompactionId_++ : 0;
ui64 compactionId = strategy->BeginMemCompaction(0, { 0, TEpoch::Max() }, forcedCompactionId);
auto outcome = RunCompaction(compactionId);
const ui32 table = outcome.Params->Table;
auto changes = strategy->CompactionFinished(
compactionId, std::move(outcome.Params), std::move(outcome.Result));
ApplyChanges(table, std::move(changes));
+ return forcedCompactionId;
}
bool SimpleTableCompaction(ui32 table, IResourceBroker* broker, ICompactionStrategy* strategy) {
@@ -328,6 +329,7 @@ private:
ui64 NextReadId_ = 1;
ui64 NextCompactionId_ = 1;
+ ui64 NextForcedCompactionId_ = 1001;
bool ChangesRequested_ = false;
};
diff --git a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
index 94ef1b5a92..de2a3caf07 100644
--- a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
+++ b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
@@ -4,8 +4,6 @@
#include <library/cpp/testing/unittest/registar.h>
-constexpr ui32 Table = 1;
-
namespace NKikimr {
namespace NTable {
namespace NCompGen {
@@ -14,6 +12,8 @@ using namespace NTest;
Y_UNIT_TEST_SUITE(TGenCompaction) {
+ constexpr ui32 Table = 1;
+
struct Schema : NIceDb::Schema {
struct Data : Table<1> {
struct Key : Column<1, NScheme::NTypeIds::Uint64> { };
@@ -26,7 +26,7 @@ Y_UNIT_TEST_SUITE(TGenCompaction) {
using TTables = SchemaTables<Data>;
};
- Y_UNIT_TEST(ShouldIncreaseOverloadWhenForceCompaction) {
+ Y_UNIT_TEST(OverloadFactorDuringForceCompaction) {
TSimpleBackend backend;
TSimpleBroker broker;
TSimpleLogger logger;
@@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(TGenCompaction) {
// forced compaction is in progress (waiting resource broker to start gen compaction)
UNIT_ASSERT(!strategy.AllowForcedCompaction());
- UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 1);
// finish forced compaction
while (broker.HasPending()) {
@@ -92,6 +92,399 @@ Y_UNIT_TEST_SUITE(TGenCompaction) {
UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 0);
}
+
+ Y_UNIT_TEST(ForcedCompactionNoGenerations) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 64; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ // Start a forced mem compaction with id 123
+ {
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123);
+ UNIT_ASSERT(memCompactionId != 0);
+ auto outcome = backend.RunCompaction(memCompactionId);
+
+ UNIT_ASSERT(outcome.Params);
+ UNIT_ASSERT(!outcome.Params->Parts);
+ UNIT_ASSERT(outcome.Params->IsFinal);
+
+ auto changes = strategy.CompactionFinished(
+ memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect forced compaction to place results on level 255
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 255u);
+
+ // We expect forced compaction to be finished and a new one immediately allowed
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u);
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ }
+
+ // Insert some more rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 64; key < 128; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ // Start a forced mem compaction with id 234
+ {
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 234);
+ UNIT_ASSERT(memCompactionId != 0);
+ auto outcome = backend.RunCompaction(memCompactionId);
+
+ UNIT_ASSERT(outcome.Params);
+ UNIT_ASSERT(outcome.Params->Parts);
+ UNIT_ASSERT(outcome.Params->IsFinal);
+
+ auto changes = strategy.CompactionFinished(
+ memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect forced compaction to place results on level 255
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 255u);
+
+ // We expect forced compaction to be finished and a new one immediately allowed
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 234);
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ }
+
+ // Don't expect any tasks or change requests
+ UNIT_ASSERT(!broker.HasPending());
+ UNIT_ASSERT(!backend.PendingReads);
+ UNIT_ASSERT(!backend.StartedCompactions);
+ UNIT_ASSERT(!backend.CheckChangesFlag());
+ }
+
+ Y_UNIT_TEST(ForcedCompactionWithGenerations) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Don't expect any tasks or change requests
+ UNIT_ASSERT(!broker.HasPending());
+ UNIT_ASSERT(!backend.PendingReads);
+ UNIT_ASSERT(!backend.StartedCompactions);
+ UNIT_ASSERT(!backend.CheckChangesFlag());
+
+ // Insert some rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 64; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ // Start a forced mem compaction with id 123
+ {
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123);
+ UNIT_ASSERT(memCompactionId != 0);
+ auto outcome = backend.RunCompaction(memCompactionId);
+
+ UNIT_ASSERT(outcome.Params);
+ UNIT_ASSERT(!outcome.Params->Parts);
+ UNIT_ASSERT(outcome.Params->IsFinal);
+
+ auto changes = strategy.CompactionFinished(
+ memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect forced compaction to place results on level 1
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u);
+
+ // We expect forced compaction to be finished and a new one immediately allowed
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u);
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ }
+
+ // Don't expect any tasks or change requests
+ UNIT_ASSERT(!broker.HasPending());
+ UNIT_ASSERT(!backend.PendingReads);
+ UNIT_ASSERT(!backend.StartedCompactions);
+ UNIT_ASSERT(!backend.CheckChangesFlag());
+
+ // Insert some more rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 64; key < 128; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ // Start one more force compaction with id 234
+ {
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 234);
+ UNIT_ASSERT(memCompactionId != 0);
+ auto outcome = backend.RunCompaction(memCompactionId);
+
+ UNIT_ASSERT(outcome.Params);
+ UNIT_ASSERT(!outcome.Params->Parts);
+ UNIT_ASSERT(!outcome.Params->IsFinal);
+
+ auto changes = strategy.CompactionFinished(
+ memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect forced compaction to again place results (there are none) on level 1
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u);
+
+ // We expect compaction 234 not to be finished yet
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u);
+ UNIT_ASSERT(!strategy.AllowForcedCompaction());
+ }
+
+ // There should be a compaction task pending right now
+ UNIT_ASSERT(broker.RunPending());
+ UNIT_ASSERT(!broker.HasPending());
+
+ // There should be compaction started right now
+ UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u);
+
+ // Perform this compaction
+ {
+ auto outcome = backend.RunCompaction();
+ UNIT_ASSERT(outcome.Params->Parts);
+ UNIT_ASSERT(outcome.Params->IsFinal);
+ UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u);
+
+ auto* genParams = CheckedCast<TGenCompactionParams*>(outcome.Params.Get());
+ UNIT_ASSERT_VALUES_EQUAL(genParams->Generation, 1u);
+
+ auto changes = strategy.CompactionFinished(
+ outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect the result to be uplifted to level 1
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u);
+
+ // We expect compaction 234 to be finished
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 234u);
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ }
+ }
+
+ Y_UNIT_TEST(ForcedCompactionWithFinalParts) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 64; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+
+ // Alter schema to policy with generations
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ strategy.ReflectSchema();
+ }
+
+ // Insert some more rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 64; key < 128; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ // Start force compaction with id 123
+ {
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123);
+ UNIT_ASSERT(memCompactionId != 0);
+ auto outcome = backend.RunCompaction(memCompactionId);
+
+ UNIT_ASSERT(outcome.Params);
+ UNIT_ASSERT(!outcome.Params->Parts);
+ UNIT_ASSERT(!outcome.Params->IsFinal);
+
+ auto changes = strategy.CompactionFinished(
+ memCompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect forced compaction to place results before final parts on level 3
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 3u);
+
+ // We expect compaction 123 not to be finished yet
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 0u);
+ UNIT_ASSERT(!strategy.AllowForcedCompaction());
+ }
+
+ // There should be a compaction task pending right now
+ UNIT_ASSERT(broker.RunPending());
+ UNIT_ASSERT(!broker.HasPending());
+
+ // There should be compaction started right now
+ UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u);
+
+ // Perform this compaction
+ {
+ auto outcome = backend.RunCompaction();
+ UNIT_ASSERT(outcome.Params->IsFinal);
+ UNIT_ASSERT_VALUES_EQUAL(outcome.Params->Parts.size(), 2u);
+ UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u);
+
+ auto* genParams = CheckedCast<TGenCompactionParams*>(outcome.Params.Get());
+ UNIT_ASSERT_VALUES_EQUAL(genParams->Generation, 3u);
+
+ auto changes = strategy.CompactionFinished(
+ outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result));
+
+ // We expect the result to be uplifted to level 1
+ UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u);
+
+ // We expect compaction 234 to be finished
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u);
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ }
+ }
+
+ Y_UNIT_TEST(ForcedCompactionByDeletedRows) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.DroppedRowsPercentToCompact = 50;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ for (auto& gen : policy.Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 0;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ }
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42);
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+
+ // Erase more than 50% of rows
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 10; ++key) {
+ db.Table<Schema::Data>().Key(key).Delete();
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u);
+
+ // We expect a forced compaction to be pending right now
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant());
+ UNIT_ASSERT(!strategy.AllowForcedCompaction());
+ UNIT_ASSERT(broker.HasPending());
+
+ time.Move(TInstant::Seconds(60));
+
+ // finish forced compactions
+ while (broker.HasPending()) {
+ UNIT_ASSERT(broker.RunPending());
+ if (backend.StartedCompactions.empty())
+ continue;
+
+ auto result = backend.RunCompaction();
+ auto changes = strategy.CompactionFinished(
+ result.CompactionId, std::move(result.Params), std::move(result.Result));
+ backend.ApplyChanges(Table, std::move(changes));
+ }
+
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant::Seconds(60));
+ }
+
};
} // NCompGen
diff --git a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
index 525063d30c..c2dba25ab9 100644
--- a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
+++ b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp
@@ -972,7 +972,7 @@ Y_UNIT_TEST_SUITE(TShardedCompactionScenarios) {
// Start a memtable compaction using this strategy
{
- auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, false);
+ auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 0);
UNIT_ASSERT(memCompactionId != 0);
auto outcome = backend.RunCompaction(memCompactionId);