diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-11 20:31:54 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-02-11 20:31:54 +0300 |
commit | 76c9f83980f89b39db53efda3e343c4ddb36491d (patch) | |
tree | d92eb9f6adcee736fd6cc029e3c04a68e431d17b | |
parent | 05ac286ec4ba733e0b158caa74c576e49b24b23e (diff) | |
download | ydb-76c9f83980f89b39db53efda3e343c4ddb36491d.tar.gz |
Change forced compactions to gradually compact through generations, KIKIMR-14094
ref:00a0a510453b13a58df4070ae60c39e2577f4200
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_gen.cpp | 264 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_gen.h | 12 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/flat_comp_ut_common.h | 6 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_comp_gen.cpp | 401 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_comp_shard.cpp | 2 |
5 files changed, 495 insertions, 190 deletions
diff --git a/ydb/core/tablet_flat/flat_comp_gen.cpp b/ydb/core/tablet_flat/flat_comp_gen.cpp index 43bf95357b..47c57fe192 100644 --- a/ydb/core/tablet_flat/flat_comp_gen.cpp +++ b/ydb/core/tablet_flat/flat_comp_gen.cpp @@ -302,13 +302,15 @@ void TGenCompactionStrategy::Stop() { FinalCompactionId = 0; FinalCompactionLevel = 0; FinalCompactionTaken = 0; - ForcedMemCompactionId = 0; ForcedState = EForcedState::None; + ForcedMemCompactionId = 0; + ForcedGeneration = 0; MaxOverloadFactor = 0.0; CurrentForcedGenCompactionId = 0; NextForcedGenCompactionId = 0; FinishedForcedGenCompactionId = 0; + FinishedForcedGenCompactionTs = {}; // Make it possible to Start again Generations.clear(); @@ -390,25 +392,14 @@ ui64 TGenCompactionStrategy::BeginMemCompaction(TTaskId taskId, TSnapEdge edge, taskId, edge, /* generation */ 0, - /* full */ false, extra); Y_VERIFY(MemCompactionId != 0); if (forcedCompactionId != 0) { - if (!Generations.empty()) { - // We remember the last forced compaction we have started - ForcedMemCompactionId = MemCompactionId; - } - - switch (ForcedState) { - case EForcedState::None: - case EForcedState::Pending: - CurrentForcedGenCompactionId = forcedCompactionId; - break; - case EForcedState::Compacting: - NextForcedGenCompactionId = forcedCompactionId; - break; - } + // We remember the last forced mem compaction we have started + ForcedMemCompactionId = MemCompactionId; + // We also remember the last user-provided forced compaction id + NextForcedGenCompactionId = forcedCompactionId; } return MemCompactionId; @@ -467,6 +458,8 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( TCompactionChanges changes; + TVector<bool> checkNeeded(Generations.size()); + auto& sourceParts = params->Parts; if (compactionId == FinalCompactionId || generation == 255) { @@ -496,11 +489,12 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( } sourceParts.erase(partStart, partEnd); FinalCompactionId = 0; + // Compaction policy may have increased the number of generations + for (ui32 index : xrange(size_t(generation), Generations.size())) { + checkNeeded[index] = true; + } } - TVector<bool> checkNeeded(Generations.size()); - bool fullCompaction = false; - if (generation < Generations.size()) { auto& nextGen = Generations[generation]; if (nextGen.TakenHeadParts != 0) { @@ -525,19 +519,20 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( } } + bool forcedCompactionContinue = false; + if (generation == 0) { // This was a memtable compaction, we don't expect anything else Y_VERIFY(sourceParts.empty()); // Check if we just finished the last forced mem compaction if (ForcedMemCompactionId && compactionId == ForcedMemCompactionId) { - if (KnownParts.empty() || Generations.empty()) { - // All we have are new parts, there's no point in recompacting - // everything again - ForcedMemCompactionId = 0; - } else if (ForcedState == EForcedState::Pending) { - // Forced compaction hasn't started yet, don't need to requeue - ForcedMemCompactionId = 0; + ForcedMemCompactionId = 0; + + // Continue compaction when we don't have some other compaction running + if (ForcedState == EForcedState::None) { + CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0); + forcedCompactionContinue = true; } } } else if (generation == 255) { @@ -547,22 +542,19 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( ui32 sourceIndex = generation - 1; Y_VERIFY(sourceIndex < Generations.size()); - // During full compaction all other generations are idle - fullCompaction = (ForcedState == EForcedState::Compacting); + if (ForcedState == EForcedState::Compacting && ForcedGeneration == generation) { + ForcedState = EForcedState::None; + forcedCompactionContinue = true; + } while (sourceParts) { - // Search back for the first generation that has non-empty parts - while (fullCompaction && sourceIndex > 0 && Generations[sourceIndex].Parts.empty()) { - --sourceIndex; - } - auto& sourcePart = sourceParts.back(); auto& sourceGen = Generations[sourceIndex]; Y_VERIFY(sourceGen.Parts); auto& part = sourceGen.Parts.back(); Y_VERIFY(part.Label == sourcePart->Label, - "Failed at gen=%u, sourceIndex=%u, full=%d, headTaken=%lu", - generation, sourceIndex, fullCompaction, sourceGen.TakenHeadParts); + "Failed at gen=%u, sourceIndex=%u, headTaken=%lu", + generation, sourceIndex, sourceGen.TakenHeadParts); Y_VERIFY(sourceGen.CompactingTailParts > 0); CachedGarbageBytes -= part.GarbageBytes; KnownParts.erase(part.Label); @@ -573,20 +565,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( Y_VERIFY(sourceParts.empty()); - for (ui32 index : xrange(sourceIndex, generation)) { - Y_VERIFY(Generations[index].CompactingTailParts == 0); - } - - if (fullCompaction) { - ForcedState = EForcedState::None; - OnForcedGenCompactionDone(); - - // Will have to recheck all parent generations - for (ui32 parent : xrange(1u, generation)) { - checkNeeded[parent - 1] = true; - } - } - // Recheck compacted generation, it's free and may need a new compaction checkNeeded[generation - 1] = true; } @@ -601,8 +579,36 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( newStats += TStats(partView); } + // This will be an index where we place results ui32 target = generation != 255 ? generation : Generations.size(); - while (newParts && target > 0) { + + // After forced compaction we may want to place results as low as possible + if (forcedCompactionContinue) { + while (target < Generations.size()) { + auto& candidate = Generations[target]; + if (!candidate.Parts.empty()) { + // Cannot pass non-empty generations + break; + } + // Try to move to the next generation + ++target; + } + if (target == Generations.size()) { + if (generation >= target || (FinalParts.empty() && ColdParts.empty())) { + // The forced compaction has finished, uplift logic will kick in below + forcedCompactionContinue = false; + if (target > generation) { + target = generation; + } + OnForcedGenCompactionDone(); + } else { + // We need to compact final parts, so we need to place results at the last generation + --target; + } + } + } + + while (newParts && target > 0 && !forcedCompactionContinue) { auto& candidate = Generations[target - 1]; if (candidate.CompactingTailParts > 0) { // Cannot uplift to busy generations @@ -651,7 +657,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( } } else { auto& newGen = Generations[target]; - if (target != generation) { + if (target < generation) { Y_VERIFY(!newGen.Parts || result->Epoch <= newGen.Parts.back().Epoch); for (auto it = newParts.begin(); it != newParts.end(); ++it) { auto& partView = *it; @@ -680,24 +686,34 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( UpdateStats(); + if (forcedCompactionContinue) { + Y_VERIFY(target < Generations.size()); + ForcedState = EForcedState::Pending; + ForcedGeneration = target + 1; + checkNeeded[target] = true; + } + if (Generations) { - bool needUpdateOverload = false; + // Check various conditions for starting a forced compaction if (ForcedState == EForcedState::None) { - if (ForcedMemCompactionId && ForcedMemCompactionId != MemCompactionId) { - // The forced memtable compaction has finished, start gen compaction - ForcedState = EForcedState::Pending; - ForcedMemCompactionId = 0; - needUpdateOverload = true; + bool startForcedCompaction = false; + + if (NextForcedGenCompactionId && ForcedMemCompactionId == 0) { + // The previous forced compaction has finished, start gen compactions + CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0); + startForcedCompaction = true; } else if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { // Table has too many dropped rows, compact everything - ForcedState = EForcedState::Pending; - needUpdateOverload = true; + startForcedCompaction = true; } else if (CachedDroppedBytesPercent >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { // Table has too much garbage, compact everything + startForcedCompaction = true; + } + + if (startForcedCompaction) { ForcedState = EForcedState::Pending; - needUpdateOverload = true; - } else if (CurrentForcedGenCompactionId) { - OnForcedGenCompactionDone(); + ForcedGeneration = 1; + checkNeeded[0] = true; } } @@ -709,14 +725,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( CheckGeneration(index + 1); } } - - if (needUpdateOverload && Generations.size() > 1 && !checkNeeded[1]) { - // in case of forced compaction we must recalculate overload - // to decrease it and upper reject threshold - CheckOverload(1); - } - } else { - OnForcedGenCompactionDone(); } UpdateOverload(); @@ -725,45 +733,10 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( } void TGenCompactionStrategy::OnForcedGenCompactionDone() { - if (CurrentForcedGenCompactionId) { - FinishedForcedGenCompactionId = CurrentForcedGenCompactionId; - FinishedForcedGenCompactionTs = Time->Now(); - CurrentForcedGenCompactionId = 0; - } - - if (NextForcedGenCompactionId) { - CurrentForcedGenCompactionId = NextForcedGenCompactionId; - NextForcedGenCompactionId = 0; - } - - CheckForcedGenCompactionNeeded(); -} - -void TGenCompactionStrategy::CheckForcedGenCompactionNeeded() { - // we already planned compaction - if (ForcedState != EForcedState::None || ForcedMemCompactionId) - return; - - if (CurrentForcedGenCompactionId <= FinishedForcedGenCompactionId) { - CurrentForcedGenCompactionId = 0; - return; - } - - // note that CurrentForcedGenCompactionId != 0, because of check above - if (!Generations) { - FinishedForcedGenCompactionId = CurrentForcedGenCompactionId; - FinishedForcedGenCompactionTs = Time->Now(); - CurrentForcedGenCompactionId = 0; - return; - } - - ForcedState = EForcedState::Pending; - CheckGeneration(Generations.size()); - if (Generations.size() > 1) { - // in case of forced compaction we must recalculate overload - // to decrease it and upper reject threshold - CheckOverload(1); + if (CurrentForcedGenCompactionId != 0) { + FinishedForcedGenCompactionId = std::exchange(CurrentForcedGenCompactionId, 0); } + FinishedForcedGenCompactionTs = Time->Now(); } void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) { @@ -865,7 +838,7 @@ void TGenCompactionStrategy::OutputHtml(IOutputStream& out) { << ", Epochs count: " << gen.PartEpochCount << ", Overload factor: " << gen.OverloadFactor << ", Compaction state: " << gen.State; } - if (generation == Generations.size() && ForcedState != EForcedState::None) { + if (ForcedState != EForcedState::None && ForcedGeneration == generation) { out << ", Forced compaction: " << ForcedState; } if (gen.Task.TaskId) { @@ -964,7 +937,6 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation) taskId, /* edge */ { }, generation, - /* full */ false, extra); FinalState.State = EState::Compacting; return; @@ -977,7 +949,7 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation) Y_VERIFY(gen.State == EState::Pending || gen.State == EState::PendingBackground); Y_VERIFY(gen.Task.TaskId == taskId); - const bool full = NeedToForceCompact(generation); + const bool forced = NeedToForceCompact(generation); auto cancelCompaction = [&]() { Broker->FinishTask(taskId, EResourceStatus::Cancelled); @@ -985,18 +957,7 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation) gen.State = EState::Free; - if (full) { - // We just cancelled a forced compaction - ForcedState = EForcedState::None; - CheckForcedGenCompactionNeeded(); - - // Make sure to kickstart all generations - for (ui32 index : xrange(Generations.size())) { - CheckGeneration(index + 1); - } - - UpdateOverload(); - } + Y_VERIFY(!forced, "Unexpected cancellation of a forced compaction"); }; if (DesiredMode(generation) == EDesiredMode::None) { @@ -1005,41 +966,8 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation) return; } - if (full) { - // We are going to start a forced compaction, cancel everything else - // except a possible ongoing memtable compaction - Y_VERIFY(FinalCompactionId == 0, "Full compaction while final compaction is running"); - for (ui32 index : xrange(generation - 1)) { - auto& parent = Generations[index]; - auto& next = Generations[index + 1]; - switch (std::exchange(parent.State, EState::Free)) { - case EState::Free: - break; - case EState::Pending: - case EState::PendingBackground: - // Compaction is scheduled, but not started yet - Y_VERIFY(parent.CompactingTailParts == 0); - Y_VERIFY(next.TakenHeadParts == 0); - Broker->CancelTask(parent.Task.TaskId); - break; - case EState::Compacting: - // Compaction is running right now, but we know it's - // not a full compaction, so may only use parts between - // parent and next. - parent.CompactingTailParts = 0; - next.TakenHeadParts = 0; - next.TakenHeadBackingSize = 0; - next.TakenHeadPartEpochCount = 0; - Backend->CancelCompaction(parent.Task.CompactionId); - break; - } - parent.Task.TaskId = 0; - parent.Task.CompactionId = 0; - } - } - TExtraState extra; - if (!full && generation < Generations.size()) { + if (!forced && generation < Generations.size()) { extra.InitFromPolicy(Policy->Generations[generation]); } @@ -1051,14 +979,14 @@ void TGenCompactionStrategy::BeginGenCompaction(TTaskId taskId, ui32 generation) taskId, /* edge */ { }, generation, - full, extra); gen.State = EState::Compacting; - if (full) { + if (forced) { // We have just started a forced compaction ForcedState = EForcedState::Compacting; + Y_VERIFY(ForcedGeneration == generation); } } @@ -1105,8 +1033,8 @@ ui32 TGenCompactionStrategy::ComputeBackgroundPriority( Y_VERIFY(generation > 0); Y_VERIFY(generation <= Generations.size()); - if (generation == Generations.size() && ForcedState == EForcedState::Pending) { - // This background compaction will be used for the full compaction + if (NeedToForceCompact(generation)) { + // This background compaction will be used for the forced compaction // TODO: figure out which priority we should use for such compactions return Policy->DefaultTaskPriority; } @@ -1164,10 +1092,6 @@ void TGenCompactionStrategy::CheckOverload(ui32 generation) { // TODO: make lo and hi watermarks configurable float loK = 1.5; float hiK = 3; - if (generation == 1 && ShouldIncreaseOverloadWhatermarts()) { - loK *= 3; - hiK *= 3; - } overloadFactor = Max(overloadFactor, mapToRange(genSize, genPolicy.ForceSizeToCompact*loK, genPolicy.ForceSizeToCompact*hiK)); overloadFactor = Max(overloadFactor, mapToRange(genParts, genPolicy.ForceCountToCompact*loK, genPolicy.ForceCountToCompact*hiK)); gen.OverloadFactor = overloadFactor; @@ -1231,11 +1155,6 @@ TGenCompactionStrategy::EDesiredMode TGenCompactionStrategy::DesiredMode(ui32 ge Y_VERIFY(generation > 0); Y_VERIFY(generation <= Generations.size()); - if (ForcedState == EForcedState::Compacting) { - // We cannot start new compactions during forced compaction - return EDesiredMode::None; - } - auto& gen = Generations[generation - 1]; ui64 genSize = gen.Stats.BackingSize - gen.TakenHeadBackingSize; ui32 genParts = gen.PartEpochCount - gen.TakenHeadPartEpochCount; @@ -1294,7 +1213,6 @@ ui64 TGenCompactionStrategy::PrepareCompaction( TTaskId taskId, TSnapEdge edge, ui32 generation, - bool full, TExtraState& extra) { Y_VERIFY(generation <= Generations.size() || generation == 255); @@ -1311,7 +1229,7 @@ ui64 TGenCompactionStrategy::PrepareCompaction( if (generation > 0 && generation != 255) { bool first = true; - for (ui32 index : xrange(full ? 0u : generation - 1, generation)) { + for (ui32 index : xrange(generation - 1, generation)) { auto& gen = Generations.at(index); size_t skip = first ? gen.TakenHeadParts : 0; Y_VERIFY(gen.TakenHeadParts == skip); @@ -1335,7 +1253,7 @@ ui64 TGenCompactionStrategy::PrepareCompaction( Y_VERIFY(nextGen.TakenHeadParts == 0); Y_VERIFY(nextGen.TakenHeadBackingSize == 0); Y_VERIFY(nextGen.TakenHeadPartEpochCount == 0); - if (extra.ExtrasAllowed()) { + if (extra.ExtrasAllowed() && !NeedToForceCompact(generation + 1)) { Y_VERIFY(nextGen.Parts.size() >= nextGen.CompactingTailParts); size_t available = nextGen.Parts.size() - nextGen.CompactingTailParts; TEpoch lastEpoch = TEpoch::Max(); diff --git a/ydb/core/tablet_flat/flat_comp_gen.h b/ydb/core/tablet_flat/flat_comp_gen.h index fc38fe705e..79f7f689c6 100644 --- a/ydb/core/tablet_flat/flat_comp_gen.h +++ b/ydb/core/tablet_flat/flat_comp_gen.h @@ -213,7 +213,6 @@ namespace NCompGen { void BeginGenCompaction(TTaskId taskId, ui32 generation); void OnForcedGenCompactionDone(); - void CheckForcedGenCompactionNeeded(); ui32 ComputeBackgroundPriority( ui32 generation, @@ -235,21 +234,13 @@ namespace NCompGen { TTaskId taskId, TSnapEdge edge, ui32 generation, - bool full, TExtraState& extra); void UpdateStats(); void UpdateOverload(); bool NeedToForceCompact(ui32 generation) const { - return ( - generation == Generations.size() && - ForcedState == EForcedState::Pending && - ForcedMemCompactionId == 0); - } - - bool ShouldIncreaseOverloadWhatermarts() const { - return ForcedState != EForcedState::None; + return (ForcedState == EForcedState::Pending && ForcedGeneration == generation); } private: @@ -267,6 +258,7 @@ namespace NCompGen { size_t FinalCompactionTaken = 0; EForcedState ForcedState = EForcedState::None; ui64 ForcedMemCompactionId = 0; + ui32 ForcedGeneration = 0; float MaxOverloadFactor = 0.0; ui64 CurrentForcedGenCompactionId = 0; diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h index 66702b8c3f..0f6e025d72 100644 --- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h +++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h @@ -222,14 +222,15 @@ public: RunCompaction(¶ms); } - void SimpleMemCompaction(ICompactionStrategy* strategy, bool forced = false) { - ui64 forcedCompactionId = forced ? 1 : 0; + ui64 SimpleMemCompaction(ICompactionStrategy* strategy, bool forced = false) { + ui64 forcedCompactionId = forced ? NextForcedCompactionId_++ : 0; ui64 compactionId = strategy->BeginMemCompaction(0, { 0, TEpoch::Max() }, forcedCompactionId); auto outcome = RunCompaction(compactionId); const ui32 table = outcome.Params->Table; auto changes = strategy->CompactionFinished( compactionId, std::move(outcome.Params), std::move(outcome.Result)); ApplyChanges(table, std::move(changes)); + return forcedCompactionId; } bool SimpleTableCompaction(ui32 table, IResourceBroker* broker, ICompactionStrategy* strategy) { @@ -328,6 +329,7 @@ private: ui64 NextReadId_ = 1; ui64 NextCompactionId_ = 1; + ui64 NextForcedCompactionId_ = 1001; bool ChangesRequested_ = false; }; diff --git a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp index 94ef1b5a92..de2a3caf07 100644 --- a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp +++ b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp @@ -4,8 +4,6 @@ #include <library/cpp/testing/unittest/registar.h> -constexpr ui32 Table = 1; - namespace NKikimr { namespace NTable { namespace NCompGen { @@ -14,6 +12,8 @@ using namespace NTest; Y_UNIT_TEST_SUITE(TGenCompaction) { + constexpr ui32 Table = 1; + struct Schema : NIceDb::Schema { struct Data : Table<1> { struct Key : Column<1, NScheme::NTypeIds::Uint64> { }; @@ -26,7 +26,7 @@ Y_UNIT_TEST_SUITE(TGenCompaction) { using TTables = SchemaTables<Data>; }; - Y_UNIT_TEST(ShouldIncreaseOverloadWhenForceCompaction) { + Y_UNIT_TEST(OverloadFactorDuringForceCompaction) { TSimpleBackend backend; TSimpleBroker broker; TSimpleLogger logger; @@ -74,7 +74,7 @@ Y_UNIT_TEST_SUITE(TGenCompaction) { // forced compaction is in progress (waiting resource broker to start gen compaction) UNIT_ASSERT(!strategy.AllowForcedCompaction()); - UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 0); + UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 1); // finish forced compaction while (broker.HasPending()) { @@ -92,6 +92,399 @@ Y_UNIT_TEST_SUITE(TGenCompaction) { UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1UL); UNIT_ASSERT_VALUES_EQUAL(strategy.GetOverloadFactor(), 0); } + + Y_UNIT_TEST(ForcedCompactionNoGenerations) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 64; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + // Start a forced mem compaction with id 123 + { + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123); + UNIT_ASSERT(memCompactionId != 0); + auto outcome = backend.RunCompaction(memCompactionId); + + UNIT_ASSERT(outcome.Params); + UNIT_ASSERT(!outcome.Params->Parts); + UNIT_ASSERT(outcome.Params->IsFinal); + + auto changes = strategy.CompactionFinished( + memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect forced compaction to place results on level 255 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 255u); + + // We expect forced compaction to be finished and a new one immediately allowed + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u); + UNIT_ASSERT(strategy.AllowForcedCompaction()); + } + + // Insert some more rows + { + auto db = backend.Begin(); + for (ui64 key = 64; key < 128; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + // Start a forced mem compaction with id 234 + { + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 234); + UNIT_ASSERT(memCompactionId != 0); + auto outcome = backend.RunCompaction(memCompactionId); + + UNIT_ASSERT(outcome.Params); + UNIT_ASSERT(outcome.Params->Parts); + UNIT_ASSERT(outcome.Params->IsFinal); + + auto changes = strategy.CompactionFinished( + memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect forced compaction to place results on level 255 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 255u); + + // We expect forced compaction to be finished and a new one immediately allowed + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 234); + UNIT_ASSERT(strategy.AllowForcedCompaction()); + } + + // Don't expect any tasks or change requests + UNIT_ASSERT(!broker.HasPending()); + UNIT_ASSERT(!backend.PendingReads); + UNIT_ASSERT(!backend.StartedCompactions); + UNIT_ASSERT(!backend.CheckChangesFlag()); + } + + Y_UNIT_TEST(ForcedCompactionWithGenerations) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Don't expect any tasks or change requests + UNIT_ASSERT(!broker.HasPending()); + UNIT_ASSERT(!backend.PendingReads); + UNIT_ASSERT(!backend.StartedCompactions); + UNIT_ASSERT(!backend.CheckChangesFlag()); + + // Insert some rows + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 64; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + // Start a forced mem compaction with id 123 + { + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123); + UNIT_ASSERT(memCompactionId != 0); + auto outcome = backend.RunCompaction(memCompactionId); + + UNIT_ASSERT(outcome.Params); + UNIT_ASSERT(!outcome.Params->Parts); + UNIT_ASSERT(outcome.Params->IsFinal); + + auto changes = strategy.CompactionFinished( + memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect forced compaction to place results on level 1 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u); + + // We expect forced compaction to be finished and a new one immediately allowed + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u); + UNIT_ASSERT(strategy.AllowForcedCompaction()); + } + + // Don't expect any tasks or change requests + UNIT_ASSERT(!broker.HasPending()); + UNIT_ASSERT(!backend.PendingReads); + UNIT_ASSERT(!backend.StartedCompactions); + UNIT_ASSERT(!backend.CheckChangesFlag()); + + // Insert some more rows + { + auto db = backend.Begin(); + for (ui64 key = 64; key < 128; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + // Start one more force compaction with id 234 + { + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 234); + UNIT_ASSERT(memCompactionId != 0); + auto outcome = backend.RunCompaction(memCompactionId); + + UNIT_ASSERT(outcome.Params); + UNIT_ASSERT(!outcome.Params->Parts); + UNIT_ASSERT(!outcome.Params->IsFinal); + + auto changes = strategy.CompactionFinished( + memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect forced compaction to again place results (there are none) on level 1 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u); + + // We expect compaction 234 not to be finished yet + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u); + UNIT_ASSERT(!strategy.AllowForcedCompaction()); + } + + // There should be a compaction task pending right now + UNIT_ASSERT(broker.RunPending()); + UNIT_ASSERT(!broker.HasPending()); + + // There should be compaction started right now + UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u); + + // Perform this compaction + { + auto outcome = backend.RunCompaction(); + UNIT_ASSERT(outcome.Params->Parts); + UNIT_ASSERT(outcome.Params->IsFinal); + UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u); + + auto* genParams = CheckedCast<TGenCompactionParams*>(outcome.Params.Get()); + UNIT_ASSERT_VALUES_EQUAL(genParams->Generation, 1u); + + auto changes = strategy.CompactionFinished( + outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect the result to be uplifted to level 1 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u); + + // We expect compaction 234 to be finished + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 234u); + UNIT_ASSERT(strategy.AllowForcedCompaction()); + } + } + + Y_UNIT_TEST(ForcedCompactionWithFinalParts) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 64; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + + // Alter schema to policy with generations + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + strategy.ReflectSchema(); + } + + // Insert some more rows + { + auto db = backend.Begin(); + for (ui64 key = 64; key < 128; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + // Start force compaction with id 123 + { + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 123); + UNIT_ASSERT(memCompactionId != 0); + auto outcome = backend.RunCompaction(memCompactionId); + + UNIT_ASSERT(outcome.Params); + UNIT_ASSERT(!outcome.Params->Parts); + UNIT_ASSERT(!outcome.Params->IsFinal); + + auto changes = strategy.CompactionFinished( + memCompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect forced compaction to place results before final parts on level 3 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 3u); + + // We expect compaction 123 not to be finished yet + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 0u); + UNIT_ASSERT(!strategy.AllowForcedCompaction()); + } + + // There should be a compaction task pending right now + UNIT_ASSERT(broker.RunPending()); + UNIT_ASSERT(!broker.HasPending()); + + // There should be compaction started right now + UNIT_ASSERT_VALUES_EQUAL(backend.StartedCompactions.size(), 1u); + + // Perform this compaction + { + auto outcome = backend.RunCompaction(); + UNIT_ASSERT(outcome.Params->IsFinal); + UNIT_ASSERT_VALUES_EQUAL(outcome.Params->Parts.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(outcome.Result->Parts.size(), 1u); + + auto* genParams = CheckedCast<TGenCompactionParams*>(outcome.Params.Get()); + UNIT_ASSERT_VALUES_EQUAL(genParams->Generation, 3u); + + auto changes = strategy.CompactionFinished( + outcome.CompactionId, std::move(outcome.Params), std::move(outcome.Result)); + + // We expect the result to be uplifted to level 1 + UNIT_ASSERT_VALUES_EQUAL(changes.NewPartsLevel, 1u); + + // We expect compaction 234 to be finished + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionId(), 123u); + UNIT_ASSERT(strategy.AllowForcedCompaction()); + } + } + + Y_UNIT_TEST(ForcedCompactionByDeletedRows) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.DroppedRowsPercentToCompact = 50; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + for (auto& gen : policy.Generations) { + gen.ExtraCompactionPercent = 0; + gen.ExtraCompactionMinSize = 0; + gen.ExtraCompactionExpPercent = 0; + gen.ExtraCompactionExpMaxSize = 0; + } + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).Update<Schema::Data::Value>(42); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + + // Erase more than 50% of rows + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 10; ++key) { + db.Table<Schema::Data>().Key(key).Delete(); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u); + + // We expect a forced compaction to be pending right now + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant()); + UNIT_ASSERT(!strategy.AllowForcedCompaction()); + UNIT_ASSERT(broker.HasPending()); + + time.Move(TInstant::Seconds(60)); + + // finish forced compactions + while (broker.HasPending()) { + UNIT_ASSERT(broker.RunPending()); + if (backend.StartedCompactions.empty()) + continue; + + auto result = backend.RunCompaction(); + auto changes = strategy.CompactionFinished( + result.CompactionId, std::move(result.Params), std::move(result.Result)); + backend.ApplyChanges(Table, std::move(changes)); + } + + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant::Seconds(60)); + } + }; } // NCompGen diff --git a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp index 525063d30c..c2dba25ab9 100644 --- a/ydb/core/tablet_flat/ut/ut_comp_shard.cpp +++ b/ydb/core/tablet_flat/ut/ut_comp_shard.cpp @@ -972,7 +972,7 @@ Y_UNIT_TEST_SUITE(TShardedCompactionScenarios) { // Start a memtable compaction using this strategy { - auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, false); + auto memCompactionId = strategy.BeginMemCompaction(0, { 0, TEpoch::Max() }, 0); UNIT_ASSERT(memCompactionId != 0); auto outcome = backend.RunCompaction(memCompactionId); |