diff options
author | Alexey Borzenkov <snaury@gmail.com> | 2022-04-20 14:05:23 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@gmail.com> | 2022-04-20 14:05:23 +0300 |
commit | 748f6ea01e485802c076897a02bd1647013e4c6d (patch) | |
tree | fbea17c5125587a63aa7f1c4a2773576964b55fb | |
parent | d85809f7b86d53a83de4744b395e61df085e727f (diff) | |
download | ydb-748f6ea01e485802c076897a02bd1647013e4c6d.tar.gz |
Force compaction on too much unreachable mvcc data in otherwise idle shards, KIKIMR-14737
ref:f621e62c1a5a72ee3aa92738dbb2a6d4ba60b53a
-rw-r--r-- | ydb/core/tablet_flat/flat_comp.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_gen.cpp | 143 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_gen.h | 12 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_shard.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_comp_shard.h | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_cxx_database.h | 16 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_compaction_logic.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_compaction_logic.h | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_page_gstat.h | 156 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_comp.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_dbase.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/flat_comp_ut_common.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_comp_gen.cpp | 322 |
14 files changed, 626 insertions, 73 deletions
diff --git a/ydb/core/tablet_flat/flat_comp.h b/ydb/core/tablet_flat/flat_comp.h index 2ee56626091..a768863386e 100644 --- a/ydb/core/tablet_flat/flat_comp.h +++ b/ydb/core/tablet_flat/flat_comp.h @@ -254,6 +254,14 @@ namespace NTable { virtual void ReflectSchema() = 0; /** + * Called after the table changes its removed row versions + * + * Strategy should use this opportunity to reclaculate its garbage + * estimation and schedule compactions to free up space. + */ + virtual void ReflectRemovedRowVersions() = 0; + + /** * Called periodically so strategy has a chance to re-evaluate its * current situation (e.g. update pending task priorities). */ diff --git a/ydb/core/tablet_flat/flat_comp_gen.cpp b/ydb/core/tablet_flat/flat_comp_gen.cpp index 47c57fe192a..16ec271bd16 100644 --- a/ydb/core/tablet_flat/flat_comp_gen.cpp +++ b/ydb/core/tablet_flat/flat_comp_gen.cpp @@ -202,11 +202,6 @@ void TGenCompactionStrategy::Start(TCompactionState state) { Policy = scheme->CompactionPolicy; Generations.resize(Policy->Generations.size()); - // Reset garbage version to the minimum - // It will be recalculated in UpdateStats below anyway - CachedGarbageRowVersion = TRowVersion::Min(); - CachedGarbageBytes = 0; - for (auto& partView : Backend->TableParts(Table)) { auto label = partView->Label; ui32 level = state.PartLevels.Value(partView->Label, 255); @@ -246,6 +241,8 @@ void TGenCompactionStrategy::Start(TCompactionState state) { UpdateStats(); + MaybeAutoStartForceCompaction(); + for (ui32 index : xrange(Generations.size())) { CheckGeneration(index + 1); } @@ -340,6 +337,12 @@ void TGenCompactionStrategy::ReflectSchema() { UpdateOverload(); } +void TGenCompactionStrategy::ReflectRemovedRowVersions() { + if (Generations && MaybeAutoStartForceCompaction()) { + CheckGeneration(1); + } +} + float TGenCompactionStrategy::GetOverloadFactor() { return MaxOverloadFactor; } @@ -482,7 +485,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( Y_VERIFY(!FinalParts.empty()); auto& front = FinalParts.front(); Y_VERIFY(front.Label == (*partIt)->Label); - CachedGarbageBytes -= front.GarbageBytes; KnownParts.erase(front.Label); FinalParts.pop_front(); --FinalCompactionTaken; @@ -507,7 +509,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( Y_VERIFY(!nextGen.Parts.empty()); auto& front = nextGen.Parts.front(); Y_VERIFY(front.Label == (*partIt)->Label); - CachedGarbageBytes -= front.GarbageBytes; KnownParts.erase(front.Label); nextGen.PopFront(); } @@ -556,7 +557,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( "Failed at gen=%u, sourceIndex=%u, headTaken=%lu", generation, sourceIndex, sourceGen.TakenHeadParts); Y_VERIFY(sourceGen.CompactingTailParts > 0); - CachedGarbageBytes -= part.GarbageBytes; KnownParts.erase(part.Label); sourceGen.PopBack(); sourceParts.pop_back(); @@ -649,11 +649,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( if (target == Generations.size()) { for (auto it = newParts.rbegin(); it != newParts.rend(); ++it) { auto& partView = *it; - auto& front = FinalParts.emplace_front(std::move(partView)); - if (CachedGarbageRowVersion && front.PartView->GarbageStats) { - front.GarbageBytes = front.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion); - CachedGarbageBytes += front.GarbageBytes; - } + FinalParts.emplace_front(std::move(partView)); } } else { auto& newGen = Generations[target]; @@ -661,21 +657,13 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( Y_VERIFY(!newGen.Parts || result->Epoch <= newGen.Parts.back().Epoch); for (auto it = newParts.begin(); it != newParts.end(); ++it) { auto& partView = *it; - auto& back = newGen.PushBack(std::move(partView)); - if (CachedGarbageRowVersion && back.PartView->GarbageStats) { - back.GarbageBytes = back.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion); - CachedGarbageBytes += back.GarbageBytes; - } + newGen.PushBack(std::move(partView)); } } else { Y_VERIFY(!newGen.Parts || result->Epoch >= newGen.Parts.front().Epoch); for (auto it = newParts.rbegin(); it != newParts.rend(); ++it) { auto& partView = *it; - auto& front = newGen.PushFront(std::move(partView)); - if (CachedGarbageRowVersion && front.PartView->GarbageStats) { - front.GarbageBytes = front.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion); - CachedGarbageBytes += front.GarbageBytes; - } + newGen.PushFront(std::move(partView)); } } @@ -702,11 +690,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished( // The previous forced compaction has finished, start gen compactions CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0); startForcedCompaction = true; - } else if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { - // Table has too many dropped rows, compact everything - startForcedCompaction = true; - } else if (CachedDroppedBytesPercent >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { - // Table has too much garbage, compact everything + } else if (NeedToStartForceCompaction()) { startForcedCompaction = true; } @@ -760,7 +744,6 @@ void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) { if (it->Label == label) { Stats -= it->Stats; StatsPerTablet[label.TabletID()] -= it->Stats; - CachedGarbageBytes -= it->GarbageBytes; KnownParts.erase(it->Label); FinalParts.erase(it); break; @@ -775,10 +758,12 @@ void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) { auto& back = FinalParts.emplace_back(std::move(partView)); Stats += back.Stats; StatsPerTablet[back.Label.TabletID()] += back.Stats; - if (CachedGarbageRowVersion && back.PartView->GarbageStats) { - back.GarbageBytes = back.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion); - CachedGarbageBytes += back.GarbageBytes; - } + + // WARNING: we don't call UpdateStats here, so GarbageStatsAgg is not + // recalculated properly. This is not a problem, because this method is + // usually called when parts are borrowed, and we are not supposed to + // have any of our own data at that stage. We would wait for the first + // compaction to properly recalculate garbage stats histogram. } void TGenCompactionStrategy::PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) { @@ -1363,46 +1348,80 @@ void TGenCompactionStrategy::UpdateStats() { StatsPerTablet[part.Label.TabletID()] += part.Stats; } + // This rebuild is pretty expensive, however UpdateStats is only called + // when we start or compact something, so right now it's not a very big + // concern. + // TODO: make it possible to incrementally update this aggregate + { + NPage::TGarbageStatsAggBuilder builder; + auto process = [&](TPartInfo& part) { + if (part.PartView->GarbageStats) { + builder.Add(part.PartView->GarbageStats); + } + }; + for (auto& gen : Generations) { + for (auto& part : gen.Parts) { + process(part); + } + } + for (auto& part : FinalParts) { + process(part); + } + GarbageStatsAgg = builder.Build(); + } +} + +void TGenCompactionStrategy::UpdateOverload() { + MaxOverloadFactor = 0.0; + for (const auto& gen : Generations) { + MaxOverloadFactor = Max(MaxOverloadFactor, gen.OverloadFactor); + } +} + +ui32 TGenCompactionStrategy::DroppedBytesPercent() const { if (const auto& ranges = Backend->TableRemovedRowVersions(Table)) { auto it = ranges.begin(); if (it->Lower.IsMin()) { - // We keep garbage bytes up to date, but when the version - // changes we need to recalculate it for all parts - if (CachedGarbageRowVersion != it->Upper) { - CachedGarbageRowVersion = it->Upper; - CachedGarbageBytes = 0; - auto process = [&](TPartInfo& part) { - if (CachedGarbageRowVersion && part.PartView->GarbageStats) { - part.GarbageBytes = part.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion); - CachedGarbageBytes += part.GarbageBytes; - } else { - part.GarbageBytes = 0; - } - }; - for (auto& gen : Generations) { - for (auto& part : gen.Parts) { - process(part); - } - } - for (auto& part : FinalParts) { - process(part); - } + ui64 bytes = GarbageStatsAgg.GetGarbageBytes(it->Upper); + if (bytes > 0 && Stats.BackingSize > 0) { + return bytes * 100 / Stats.BackingSize; } } } - if (CachedGarbageBytes > 0 && Stats.BackingSize > 0) { - CachedDroppedBytesPercent = CachedGarbageBytes * 100 / Stats.BackingSize; - } else { - CachedDroppedBytesPercent = 0; + return 0; +} + +bool TGenCompactionStrategy::NeedToStartForceCompaction() const { + if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { + // Table has too many dropped rows, compact everything + return true; } + + if (DroppedBytesPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) { + // Table has too much garbage, compact everything + return true; + } + + return false; } -void TGenCompactionStrategy::UpdateOverload() { - MaxOverloadFactor = 0.0; - for (const auto& gen : Generations) { - MaxOverloadFactor = Max(MaxOverloadFactor, gen.OverloadFactor); +bool TGenCompactionStrategy::MaybeAutoStartForceCompaction() { + // Check if maybe we need to start a forced compaction + // WARNING: we only do this check when we have at least one sst that + // belongs to our own tablet, i.e. not borrowed. This is so we don't + // compact borrowed data mid-merge, which might cause epochs to become + // out of sync across generations. + if (Generations && ForcedState == EForcedState::None && StatsPerTablet.contains(Backend->OwnerTabletId())) { + bool startForcedCompaction = NeedToStartForceCompaction(); + if (startForcedCompaction) { + ForcedState = EForcedState::Pending; + ForcedGeneration = 1; + return true; + } } + + return false; } } diff --git a/ydb/core/tablet_flat/flat_comp_gen.h b/ydb/core/tablet_flat/flat_comp_gen.h index 79f7f689c60..07b39415492 100644 --- a/ydb/core/tablet_flat/flat_comp_gen.h +++ b/ydb/core/tablet_flat/flat_comp_gen.h @@ -1,6 +1,7 @@ #pragma once #include "flat_comp.h" +#include "flat_page_gstat.h" #include <library/cpp/time_provider/time_provider.h> @@ -33,6 +34,7 @@ namespace NCompGen { void Stop() override; void ReflectSchema() override; + void ReflectRemovedRowVersions() override; void UpdateCompactions() override; float GetOverloadFactor() override; ui64 GetBackingSize() override; @@ -158,7 +160,6 @@ namespace NCompGen { const TLogoBlobID Label; const TEpoch Epoch; const TStats Stats; - ui64 GarbageBytes = 0; inline bool operator<(const TPartInfo& other) const { if (other.Epoch != Epoch) { @@ -243,6 +244,10 @@ namespace NCompGen { return (ForcedState == EForcedState::Pending && ForcedGeneration == generation); } + ui32 DroppedBytesPercent() const; + bool NeedToStartForceCompaction() const; + bool MaybeAutoStartForceCompaction(); + private: ui32 const Table; ICompactionBackend* const Backend; @@ -273,10 +278,7 @@ namespace NCompGen { THashMap<TLogoBlobID, ui32> KnownParts; TStats Stats; THashMap<ui64, TStats> StatsPerTablet; - - TRowVersion CachedGarbageRowVersion = TRowVersion::Min(); - ui64 CachedGarbageBytes = 0; - ui32 CachedDroppedBytesPercent = 0; + NPage::TGarbageStatsAgg GarbageStatsAgg; }; } diff --git a/ydb/core/tablet_flat/flat_comp_shard.cpp b/ydb/core/tablet_flat/flat_comp_shard.cpp index b26139877dd..89d765129c4 100644 --- a/ydb/core/tablet_flat/flat_comp_shard.cpp +++ b/ydb/core/tablet_flat/flat_comp_shard.cpp @@ -1009,6 +1009,10 @@ namespace NCompShard { CheckCompactions(); } + void TShardedCompactionStrategy::ReflectRemovedRowVersions() { + // nothing + } + void TShardedCompactionStrategy::UpdateCompactions() { CheckCompactions(); } diff --git a/ydb/core/tablet_flat/flat_comp_shard.h b/ydb/core/tablet_flat/flat_comp_shard.h index 115e9ce8596..073a5f731ea 100644 --- a/ydb/core/tablet_flat/flat_comp_shard.h +++ b/ydb/core/tablet_flat/flat_comp_shard.h @@ -559,6 +559,7 @@ namespace NCompShard { void Start(TCompactionState state) override; void Stop() override; void ReflectSchema() override; + void ReflectRemovedRowVersions() override; void UpdateCompactions() override; float GetOverloadFactor() override; ui64 GetBackingSize() override; diff --git a/ydb/core/tablet_flat/flat_cxx_database.h b/ydb/core/tablet_flat/flat_cxx_database.h index 3dfbee66a47..5282500ecd3 100644 --- a/ydb/core/tablet_flat/flat_cxx_database.h +++ b/ydb/core/tablet_flat/flat_cxx_database.h @@ -1971,6 +1971,22 @@ struct Schema { void Delete() { Database->Update(TableId, NTable::ERowOp::Erase, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), { }); } + + template <typename... ColumnTypes> + KeyOperations& UpdateV(const TRowVersion& rowVersion, const typename ColumnTypes::Type&... value) { + return UpdateV(rowVersion, TUpdate<ColumnTypes>(value)...); + } + + template <typename... UpdateTypes> + KeyOperations& UpdateV(const TRowVersion& rowVersion, const UpdateTypes&... updates) { + std::array<TUpdateOp, sizeof...(UpdateTypes)> update_ops = {{updates...}}; + Database->Update(TableId, NTable::ERowOp::Upsert, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), update_ops, rowVersion); + return *this; + } + + void DeleteV(const TRowVersion& rowVersion) { + Database->Update(TableId, NTable::ERowOp::Erase, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), { }, rowVersion); + } }; }; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 7575afd7091..4f6b5de3d1a 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1959,6 +1959,8 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv for (auto& xpair : change->RemovedRowVersions) { const auto tableId = xpair.first; + CompactionLogic->ReflectRemovedRowVersions(tableId); + NKikimrExecutorFlat::TTablePartSwitch proto; proto.SetTableId(tableId); diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp index 3d699e8f87b..b28c5214496 100644 --- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp +++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp @@ -266,6 +266,14 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges() return result; } +void TCompactionLogic::ReflectRemovedRowVersions(ui32 table) +{ + auto *tableInfo = State->Tables.FindPtr(table); + if (tableInfo) { + tableInfo->Strategy->ReflectRemovedRowVersions(); + } +} + THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy( ui32 tableId, NKikimrSchemeOp::ECompactionStrategy strategy) diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.h b/ydb/core/tablet_flat/flat_executor_compaction_logic.h index b85a13a19ec..7993fdb83c0 100644 --- a/ydb/core/tablet_flat/flat_executor_compaction_logic.h +++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.h @@ -218,6 +218,7 @@ public: TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 table); TReflectSchemeChangesResult ReflectSchemeChanges(); + void ReflectRemovedRowVersions(ui32 table); void UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size); void CheckInMemStats(ui32 table); void UpdateLogUsage(TArrayRef<const NRedo::TUsage>); diff --git a/ydb/core/tablet_flat/flat_page_gstat.h b/ydb/core/tablet_flat/flat_page_gstat.h index 0ee555a3625..d5eff46141c 100644 --- a/ydb/core/tablet_flat/flat_page_gstat.h +++ b/ydb/core/tablet_flat/flat_page_gstat.h @@ -62,10 +62,18 @@ namespace NPage { Items = { ptr, ptr + header->Items }; } - ui64 Count() const { + size_t Count() const { return Items.size(); } + TRowVersion GetRowVersionAtIndex(size_t index) const { + return Items[index].GetRowVersion(); + } + + ui64 GetGarbageBytesAtIndex(size_t index) const { + return Items[index].GetBytes(); + } + /** * Returns number of bytes that are guaranteed to be freed * if everything up to rowVersion is marked as removed. @@ -233,6 +241,152 @@ namespace NPage { THeapByBytes ByBytes; }; + /** + * An aggregate across multiple TGarbageStats pages, reads are O(log N) + */ + class TGarbageStatsAgg { + private: + friend class TGarbageStatsAggBuilder; + + struct TItem { + ui64 Step; + ui64 TxId; + ui64 Bytes; + + TRowVersion GetRowVersion() const { + return TRowVersion(Step, TxId); + } + + ui64 GetBytes() const { + return Bytes; + } + }; + + public: + TGarbageStatsAgg() = default; + + TGarbageStatsAgg(const TGarbageStatsAgg&) = delete; + TGarbageStatsAgg& operator=(const TGarbageStatsAgg&) = delete; + + TGarbageStatsAgg(TGarbageStatsAgg&&) noexcept = default; + TGarbageStatsAgg& operator=(TGarbageStatsAgg&&) noexcept = default; + + private: + TGarbageStatsAgg(TVector<TItem>&& items) + : Items(std::move(items)) + { } + + public: + /** + * Returns number of bytes that are guaranteed to be freed + * if everything up to rowVersion is marked as removed. + */ + ui64 GetGarbageBytes(const TRowVersion& rowVersion) const { + if (Items.empty()) { + return 0; + } + + auto cmp = [](const TRowVersion& rowVersion, const TItem& item) -> bool { + return rowVersion < item.GetRowVersion(); + }; + + // First item with version > rowVersion + auto it = std::upper_bound(Items.begin(), Items.end(), rowVersion, cmp); + if (it == Items.begin()) { + return 0; + } + + // First item with version <= rowVersion + return (--it)->GetBytes(); + } + + private: + TVector<TItem> Items; + }; + + class TGarbageStatsAggBuilder { + private: + using TItem = TGarbageStatsAgg::TItem; + + public: + TGarbageStatsAggBuilder() = default; + + /** + * Adds a delta bytes that is guaranteed to be freed when everything + * up to rowVersion is marked as removed. + */ + void Add(const TRowVersion& rowVersion, ui64 bytes) { + Items.push_back(TItem{ rowVersion.Step, rowVersion.TxId, bytes }); + } + + /** + * Adds all data in the given garbage stats page to the aggregate. + */ + void Add(const TGarbageStats* stats) { + ui64 prev = 0; + size_t count = stats->Count(); + for (size_t index = 0; index < count; ++index) { + TRowVersion rowVersion = stats->GetRowVersionAtIndex(index); + ui64 bytes = stats->GetGarbageBytesAtIndex(index); + Add(rowVersion, bytes - prev); + prev = bytes; + } + } + + /** + * Adds all data in the given garbage stats page to the aggregate. + */ + void Add(const TIntrusiveConstPtr<TGarbageStats>& stats) { + Add(stats.Get()); + } + + /** + * Builds an aggregate object, destroying builder in the process + */ + TGarbageStatsAgg Build() { + if (!Items.empty()) { + auto cmp = [](const TItem& a, const TItem& b) -> bool { + return a.GetRowVersion() < b.GetRowVersion(); + }; + + // Sort data by row version + std::sort(Items.begin(), Items.end(), cmp); + + // Compute aggregate bytes + ui64 bytes = 0; + auto dst = Items.begin(); + for (auto src = Items.begin(); src != Items.end(); ++src) { + bytes += src->GetBytes(); + if (dst != Items.begin()) { + auto last = std::prev(dst); + if (last->GetRowVersion() == src->GetRowVersion()) { + last->Bytes = bytes; + continue; + } + } + if (dst != src) { + *dst = *src; + } + dst->Bytes = bytes; + ++dst; + } + + if (dst != Items.end()) { + Items.erase(dst, Items.end()); + } + + Items.shrink_to_fit(); + } + + TGarbageStatsAgg agg(std::move(Items)); + Items.clear(); + return agg; + } + + private: + TVector<TItem> Items; + }; + } // namespace NPage } // namespace NTable } // namespace NKikimr diff --git a/ydb/core/tablet_flat/test/libs/table/test_comp.h b/ydb/core/tablet_flat/test/libs/table/test_comp.h index 1f6d83b29f2..8e3bf7fa1f0 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_comp.h +++ b/ydb/core/tablet_flat/test/libs/table/test_comp.h @@ -4,6 +4,7 @@ #include "test_writer.h" #include <ydb/core/tablet_flat/util_basics.h> +#include <ydb/core/tablet_flat/flat_row_versions.h> #include <ydb/core/tablet_flat/flat_table_subset.h> #include <ydb/core/tablet_flat/flat_scan_feed.h> #include <ydb/core/tablet_flat/test/libs/rows/layout.h> @@ -47,6 +48,11 @@ namespace NTest { } + TCompaction& WithRemovedRowVersions(const TRowVersionRanges& ranges) { + RemovedRowVersions = ranges.Snapshot(); + return *this; + } + TPartEggs Do(TIntrusiveConstPtr<TMemTable> table) { return Do(TSubset(TEpoch::Zero(), table->Scheme, { TMemTableSnapshot{ table, table->Immediate() } })); @@ -164,6 +170,10 @@ namespace NTest { EScan Feed(const TRow &row, TRowVersion &rowVersion) noexcept override { + if (RemovedRowVersions) { + rowVersion = RemovedRowVersions.AdjustDown(rowVersion); + } + Writer->AddKeyVersion(row, rowVersion); return Failed = 0, EScan::Feed; @@ -194,6 +204,7 @@ namespace NTest { TAutoPtr<IPages> Env; TVector<ui32> Tags; TAutoPtr<TPartWriter> Writer; + TRowVersionRanges::TSnapshot RemovedRowVersions; }; } diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h index 06bede14c9b..b0bd726c6cc 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h +++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h @@ -197,7 +197,9 @@ namespace NTest { TAutoPtr<IPages> env = new TForwardEnv(128, 256, keys, Max<ui32>()); - auto eggs = TCompaction(env, conf).Do(*subset, logo); + auto eggs = TCompaction(env, conf) + .WithRemovedRowVersions(Base->GetRemovedRowVersions(table)) + .Do(*subset, logo); Y_VERIFY(!eggs.NoResult(), "Unexpected early termination"); diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h index ab7dfb8dde4..b772b186332 100644 --- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h +++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h @@ -40,7 +40,7 @@ public: } ui64 OwnerTabletId() const override { - return 123; + return TabletId; } const TScheme& DatabaseScheme() override { @@ -181,9 +181,11 @@ public: TAutoPtr<IPages> env = new TTestEnv; // Template for new blobs - TLogoBlobID logo(123, Gen, ++Step, 0, 0, 0); + TLogoBlobID logo(TabletId, Gen, ++Step, 0, 0, 0); - auto eggs = TCompaction(env, conf).Do(*subset, logo); + auto eggs = TCompaction(env, conf) + .WithRemovedRowVersions(DB.GetRemovedRowVersions(params->Table)) + .Do(*subset, logo); TVector<TPartView> parts(Reserve(eggs.Parts.size())); for (auto& part : eggs.Parts) { @@ -312,7 +314,7 @@ private: void SwitchGen() { ++Gen; Step = 0; - Annex.Reset(new NPageCollection::TSteppedCookieAllocator(123, ui64(Gen) << 32, { 0, 999 }, {{ 1, 7 }})); + Annex.Reset(new NPageCollection::TSteppedCookieAllocator(TabletId, ui64(Gen) << 32, { 0, 999 }, {{ 1, 7 }})); } public: @@ -321,6 +323,7 @@ public: THashMap<ui64, THolder<ICompactionRead>> PendingReads; THashMap<ui64, THolder<TCompactionParams>> StartedCompactions; THashMap<ui32, THashMap<ui64, TString>> TableState; + ui64 TabletId = 123; private: THolder<NPageCollection::TSteppedCookieAllocator> Annex; diff --git a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp index de2a3caf07f..d9d5c194a0f 100644 --- a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp +++ b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp @@ -485,6 +485,328 @@ Y_UNIT_TEST_SUITE(TGenCompaction) { UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant::Seconds(60)); } + Y_UNIT_TEST(ForcedCompactionByUnreachableMvccData) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + ui64 performedCompactions = 0; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.DroppedRowsPercentToCompact = 50; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + for (auto& gen : policy.Generations) { + gen.ExtraCompactionPercent = 0; + gen.ExtraCompactionMinSize = 0; + gen.ExtraCompactionExpPercent = 0; + gen.ExtraCompactionExpMaxSize = 0; + } + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows at v1 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + + // Delete all rows at v2 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2)); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u); + + // We expect a forced compaction to be pending right now + UNIT_ASSERT(!strategy.AllowForcedCompaction()); + UNIT_ASSERT(broker.HasPending()); + + // Finish forced compactions + while (broker.HasPending()) { + UNIT_ASSERT(broker.RunPending()); + if (backend.StartedCompactions.empty()) + continue; + + UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions"); + auto result = backend.RunCompaction(); + auto changes = strategy.CompactionFinished( + result.CompactionId, std::move(result.Params), std::move(result.Result)); + backend.ApplyChanges(Table, std::move(changes)); + } + + // Everything should be compacted to a single part (erased data still visible) + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u); + + // Delete all versions from minimum up to almost v2 + { + backend.Begin(); + backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 1)); + backend.Commit(); + } + + // Notify strategy about removed row versions change + strategy.ReflectRemovedRowVersions(); + + // Nothing should be pending at this time, because all data is still visible + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT(!broker.HasPending()); + + // Delete all versions from almost v2 up to v2 + { + backend.Begin(); + backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion(2, 1), TRowVersion(2, 2)); + backend.Commit(); + } + + // Notify strategy about removed row versions change + strategy.ReflectRemovedRowVersions(); + + // We expect a forced compaction to be pending right now + UNIT_ASSERT(!strategy.AllowForcedCompaction()); + UNIT_ASSERT(broker.HasPending()); + + // Finish forced compactions + while (broker.HasPending()) { + UNIT_ASSERT(broker.RunPending()); + if (backend.StartedCompactions.empty()) + continue; + + UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions"); + auto result = backend.RunCompaction(); + auto changes = strategy.CompactionFinished( + result.CompactionId, std::move(result.Params), std::move(result.Result)); + backend.ApplyChanges(Table, std::move(changes)); + } + + // Table should become completely empty + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 0u); + } + + Y_UNIT_TEST(ForcedCompactionByUnreachableMvccDataRestart) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + ui64 performedCompactions = 0; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.DroppedRowsPercentToCompact = 50; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + for (auto& gen : policy.Generations) { + gen.ExtraCompactionPercent = 0; + gen.ExtraCompactionMinSize = 0; + gen.ExtraCompactionExpPercent = 0; + gen.ExtraCompactionExpMaxSize = 0; + } + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows at v1 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42); + } + backend.Commit(); + } + + // Delete all rows at v2 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2)); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u); + + // We expect nothing to be pending right now + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT(!broker.HasPending()); + + strategy.Stop(); + + // Delete all versions from minimum up to v2 + { + backend.Begin(); + backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 2)); + backend.Commit(); + } + + // Start a new strategy + TGenCompactionStrategy strategy2(Table, &backend, &broker, &time, "suffix"); + strategy2.Start({ }); + + // We expect a forced compaction to be pending right now + UNIT_ASSERT(!strategy2.AllowForcedCompaction()); + UNIT_ASSERT(broker.HasPending()); + + // Finish forced compactions + while (broker.HasPending()) { + UNIT_ASSERT(broker.RunPending()); + if (backend.StartedCompactions.empty()) + continue; + + UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions"); + auto result = backend.RunCompaction(); + auto changes = strategy2.CompactionFinished( + result.CompactionId, std::move(result.Params), std::move(result.Result)); + backend.ApplyChanges(Table, std::move(changes)); + } + + // Table should become completely empty + UNIT_ASSERT(strategy2.AllowForcedCompaction()); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 0u); + } + + Y_UNIT_TEST(ForcedCompactionByUnreachableMvccDataBorrowed) { + TSimpleBackend backend; + TSimpleBroker broker; + TSimpleLogger logger; + TSimpleTime time; + ui64 performedCompactions = 0; + + // Initialize the schema + { + auto db = backend.Begin(); + db.Materialize<Schema>(); + + TCompactionPolicy policy; + policy.DroppedRowsPercentToCompact = 50; + policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true); + policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true); + policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true); + for (auto& gen : policy.Generations) { + gen.ExtraCompactionPercent = 0; + gen.ExtraCompactionMinSize = 0; + gen.ExtraCompactionExpPercent = 0; + gen.ExtraCompactionExpMaxSize = 0; + } + backend.DB.Alter().SetCompactionPolicy(Table, policy); + + backend.Commit(); + } + + TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix"); + strategy.Start({ }); + + // Insert some rows at v1 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42); + } + backend.Commit(); + } + + // Delete all rows at v2 + { + auto db = backend.Begin(); + for (ui64 key = 0; key < 16; ++key) { + db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2)); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u); + + // We expect nothing to be pending right now + UNIT_ASSERT(strategy.AllowForcedCompaction()); + UNIT_ASSERT(!broker.HasPending()); + + strategy.Stop(); + + // Delete all versions from minimum up to v2 + { + backend.Begin(); + backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 2)); + backend.Commit(); + } + + // Change tablet id so all data would be treated as borrowed + backend.TabletId++; + + // Start a new strategy + TGenCompactionStrategy strategy2(Table, &backend, &broker, &time, "suffix"); + strategy2.Start({ }); + + // We expect nothing to be pending right now + UNIT_ASSERT(strategy2.AllowForcedCompaction()); + UNIT_ASSERT(!broker.HasPending()); + + // Insert a single row at v3 + { + auto db = backend.Begin(); + for (ui64 key = 16; key < 17; ++key) { + db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(3, 3), 42); + } + backend.Commit(); + } + + backend.SimpleMemCompaction(&strategy2); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u); + + // We expect a forced compaction to be pending right now + UNIT_ASSERT(!strategy2.AllowForcedCompaction()); + UNIT_ASSERT(broker.HasPending()); + + // Finish forced compactions + while (broker.HasPending()) { + UNIT_ASSERT(broker.RunPending()); + if (backend.StartedCompactions.empty()) + continue; + + UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions"); + auto result = backend.RunCompaction(); + auto changes = strategy2.CompactionFinished( + result.CompactionId, std::move(result.Params), std::move(result.Result)); + backend.ApplyChanges(Table, std::move(changes)); + } + + // Table should be left with a single sst + UNIT_ASSERT(strategy2.AllowForcedCompaction()); + UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u); + } + }; } // NCompGen |