aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@gmail.com>2022-04-20 14:05:23 +0300
committerAlexey Borzenkov <snaury@gmail.com>2022-04-20 14:05:23 +0300
commit748f6ea01e485802c076897a02bd1647013e4c6d (patch)
treefbea17c5125587a63aa7f1c4a2773576964b55fb
parentd85809f7b86d53a83de4744b395e61df085e727f (diff)
downloadydb-748f6ea01e485802c076897a02bd1647013e4c6d.tar.gz
Force compaction on too much unreachable mvcc data in otherwise idle shards, KIKIMR-14737
ref:f621e62c1a5a72ee3aa92738dbb2a6d4ba60b53a
-rw-r--r--ydb/core/tablet_flat/flat_comp.h8
-rw-r--r--ydb/core/tablet_flat/flat_comp_gen.cpp143
-rw-r--r--ydb/core/tablet_flat/flat_comp_gen.h12
-rw-r--r--ydb/core/tablet_flat/flat_comp_shard.cpp4
-rw-r--r--ydb/core/tablet_flat/flat_comp_shard.h1
-rw-r--r--ydb/core/tablet_flat/flat_cxx_database.h16
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp2
-rw-r--r--ydb/core/tablet_flat/flat_executor_compaction_logic.cpp8
-rw-r--r--ydb/core/tablet_flat/flat_executor_compaction_logic.h1
-rw-r--r--ydb/core/tablet_flat/flat_page_gstat.h156
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_comp.h11
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_dbase.h4
-rw-r--r--ydb/core/tablet_flat/ut/flat_comp_ut_common.h11
-rw-r--r--ydb/core/tablet_flat/ut/ut_comp_gen.cpp322
14 files changed, 626 insertions, 73 deletions
diff --git a/ydb/core/tablet_flat/flat_comp.h b/ydb/core/tablet_flat/flat_comp.h
index 2ee56626091..a768863386e 100644
--- a/ydb/core/tablet_flat/flat_comp.h
+++ b/ydb/core/tablet_flat/flat_comp.h
@@ -254,6 +254,14 @@ namespace NTable {
virtual void ReflectSchema() = 0;
/**
+ * Called after the table changes its removed row versions
+ *
+ * Strategy should use this opportunity to reclaculate its garbage
+ * estimation and schedule compactions to free up space.
+ */
+ virtual void ReflectRemovedRowVersions() = 0;
+
+ /**
* Called periodically so strategy has a chance to re-evaluate its
* current situation (e.g. update pending task priorities).
*/
diff --git a/ydb/core/tablet_flat/flat_comp_gen.cpp b/ydb/core/tablet_flat/flat_comp_gen.cpp
index 47c57fe192a..16ec271bd16 100644
--- a/ydb/core/tablet_flat/flat_comp_gen.cpp
+++ b/ydb/core/tablet_flat/flat_comp_gen.cpp
@@ -202,11 +202,6 @@ void TGenCompactionStrategy::Start(TCompactionState state) {
Policy = scheme->CompactionPolicy;
Generations.resize(Policy->Generations.size());
- // Reset garbage version to the minimum
- // It will be recalculated in UpdateStats below anyway
- CachedGarbageRowVersion = TRowVersion::Min();
- CachedGarbageBytes = 0;
-
for (auto& partView : Backend->TableParts(Table)) {
auto label = partView->Label;
ui32 level = state.PartLevels.Value(partView->Label, 255);
@@ -246,6 +241,8 @@ void TGenCompactionStrategy::Start(TCompactionState state) {
UpdateStats();
+ MaybeAutoStartForceCompaction();
+
for (ui32 index : xrange(Generations.size())) {
CheckGeneration(index + 1);
}
@@ -340,6 +337,12 @@ void TGenCompactionStrategy::ReflectSchema() {
UpdateOverload();
}
+void TGenCompactionStrategy::ReflectRemovedRowVersions() {
+ if (Generations && MaybeAutoStartForceCompaction()) {
+ CheckGeneration(1);
+ }
+}
+
float TGenCompactionStrategy::GetOverloadFactor() {
return MaxOverloadFactor;
}
@@ -482,7 +485,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
Y_VERIFY(!FinalParts.empty());
auto& front = FinalParts.front();
Y_VERIFY(front.Label == (*partIt)->Label);
- CachedGarbageBytes -= front.GarbageBytes;
KnownParts.erase(front.Label);
FinalParts.pop_front();
--FinalCompactionTaken;
@@ -507,7 +509,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
Y_VERIFY(!nextGen.Parts.empty());
auto& front = nextGen.Parts.front();
Y_VERIFY(front.Label == (*partIt)->Label);
- CachedGarbageBytes -= front.GarbageBytes;
KnownParts.erase(front.Label);
nextGen.PopFront();
}
@@ -556,7 +557,6 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
"Failed at gen=%u, sourceIndex=%u, headTaken=%lu",
generation, sourceIndex, sourceGen.TakenHeadParts);
Y_VERIFY(sourceGen.CompactingTailParts > 0);
- CachedGarbageBytes -= part.GarbageBytes;
KnownParts.erase(part.Label);
sourceGen.PopBack();
sourceParts.pop_back();
@@ -649,11 +649,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
if (target == Generations.size()) {
for (auto it = newParts.rbegin(); it != newParts.rend(); ++it) {
auto& partView = *it;
- auto& front = FinalParts.emplace_front(std::move(partView));
- if (CachedGarbageRowVersion && front.PartView->GarbageStats) {
- front.GarbageBytes = front.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion);
- CachedGarbageBytes += front.GarbageBytes;
- }
+ FinalParts.emplace_front(std::move(partView));
}
} else {
auto& newGen = Generations[target];
@@ -661,21 +657,13 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
Y_VERIFY(!newGen.Parts || result->Epoch <= newGen.Parts.back().Epoch);
for (auto it = newParts.begin(); it != newParts.end(); ++it) {
auto& partView = *it;
- auto& back = newGen.PushBack(std::move(partView));
- if (CachedGarbageRowVersion && back.PartView->GarbageStats) {
- back.GarbageBytes = back.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion);
- CachedGarbageBytes += back.GarbageBytes;
- }
+ newGen.PushBack(std::move(partView));
}
} else {
Y_VERIFY(!newGen.Parts || result->Epoch >= newGen.Parts.front().Epoch);
for (auto it = newParts.rbegin(); it != newParts.rend(); ++it) {
auto& partView = *it;
- auto& front = newGen.PushFront(std::move(partView));
- if (CachedGarbageRowVersion && front.PartView->GarbageStats) {
- front.GarbageBytes = front.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion);
- CachedGarbageBytes += front.GarbageBytes;
- }
+ newGen.PushFront(std::move(partView));
}
}
@@ -702,11 +690,7 @@ TCompactionChanges TGenCompactionStrategy::CompactionFinished(
// The previous forced compaction has finished, start gen compactions
CurrentForcedGenCompactionId = std::exchange(NextForcedGenCompactionId, 0);
startForcedCompaction = true;
- } else if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
- // Table has too many dropped rows, compact everything
- startForcedCompaction = true;
- } else if (CachedDroppedBytesPercent >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
- // Table has too much garbage, compact everything
+ } else if (NeedToStartForceCompaction()) {
startForcedCompaction = true;
}
@@ -760,7 +744,6 @@ void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) {
if (it->Label == label) {
Stats -= it->Stats;
StatsPerTablet[label.TabletID()] -= it->Stats;
- CachedGarbageBytes -= it->GarbageBytes;
KnownParts.erase(it->Label);
FinalParts.erase(it);
break;
@@ -775,10 +758,12 @@ void TGenCompactionStrategy::PartMerged(TPartView partView, ui32 level) {
auto& back = FinalParts.emplace_back(std::move(partView));
Stats += back.Stats;
StatsPerTablet[back.Label.TabletID()] += back.Stats;
- if (CachedGarbageRowVersion && back.PartView->GarbageStats) {
- back.GarbageBytes = back.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion);
- CachedGarbageBytes += back.GarbageBytes;
- }
+
+ // WARNING: we don't call UpdateStats here, so GarbageStatsAgg is not
+ // recalculated properly. This is not a problem, because this method is
+ // usually called when parts are borrowed, and we are not supposed to
+ // have any of our own data at that stage. We would wait for the first
+ // compaction to properly recalculate garbage stats histogram.
}
void TGenCompactionStrategy::PartMerged(TIntrusiveConstPtr<TColdPart> part, ui32 level) {
@@ -1363,46 +1348,80 @@ void TGenCompactionStrategy::UpdateStats() {
StatsPerTablet[part.Label.TabletID()] += part.Stats;
}
+ // This rebuild is pretty expensive, however UpdateStats is only called
+ // when we start or compact something, so right now it's not a very big
+ // concern.
+ // TODO: make it possible to incrementally update this aggregate
+ {
+ NPage::TGarbageStatsAggBuilder builder;
+ auto process = [&](TPartInfo& part) {
+ if (part.PartView->GarbageStats) {
+ builder.Add(part.PartView->GarbageStats);
+ }
+ };
+ for (auto& gen : Generations) {
+ for (auto& part : gen.Parts) {
+ process(part);
+ }
+ }
+ for (auto& part : FinalParts) {
+ process(part);
+ }
+ GarbageStatsAgg = builder.Build();
+ }
+}
+
+void TGenCompactionStrategy::UpdateOverload() {
+ MaxOverloadFactor = 0.0;
+ for (const auto& gen : Generations) {
+ MaxOverloadFactor = Max(MaxOverloadFactor, gen.OverloadFactor);
+ }
+}
+
+ui32 TGenCompactionStrategy::DroppedBytesPercent() const {
if (const auto& ranges = Backend->TableRemovedRowVersions(Table)) {
auto it = ranges.begin();
if (it->Lower.IsMin()) {
- // We keep garbage bytes up to date, but when the version
- // changes we need to recalculate it for all parts
- if (CachedGarbageRowVersion != it->Upper) {
- CachedGarbageRowVersion = it->Upper;
- CachedGarbageBytes = 0;
- auto process = [&](TPartInfo& part) {
- if (CachedGarbageRowVersion && part.PartView->GarbageStats) {
- part.GarbageBytes = part.PartView->GarbageStats->GetGarbageBytes(CachedGarbageRowVersion);
- CachedGarbageBytes += part.GarbageBytes;
- } else {
- part.GarbageBytes = 0;
- }
- };
- for (auto& gen : Generations) {
- for (auto& part : gen.Parts) {
- process(part);
- }
- }
- for (auto& part : FinalParts) {
- process(part);
- }
+ ui64 bytes = GarbageStatsAgg.GetGarbageBytes(it->Upper);
+ if (bytes > 0 && Stats.BackingSize > 0) {
+ return bytes * 100 / Stats.BackingSize;
}
}
}
- if (CachedGarbageBytes > 0 && Stats.BackingSize > 0) {
- CachedDroppedBytesPercent = CachedGarbageBytes * 100 / Stats.BackingSize;
- } else {
- CachedDroppedBytesPercent = 0;
+ return 0;
+}
+
+bool TGenCompactionStrategy::NeedToStartForceCompaction() const {
+ if (Stats.DroppedRowsPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
+ // Table has too many dropped rows, compact everything
+ return true;
}
+
+ if (DroppedBytesPercent() >= Policy->DroppedRowsPercentToCompact && !Policy->KeepEraseMarkers) {
+ // Table has too much garbage, compact everything
+ return true;
+ }
+
+ return false;
}
-void TGenCompactionStrategy::UpdateOverload() {
- MaxOverloadFactor = 0.0;
- for (const auto& gen : Generations) {
- MaxOverloadFactor = Max(MaxOverloadFactor, gen.OverloadFactor);
+bool TGenCompactionStrategy::MaybeAutoStartForceCompaction() {
+ // Check if maybe we need to start a forced compaction
+ // WARNING: we only do this check when we have at least one sst that
+ // belongs to our own tablet, i.e. not borrowed. This is so we don't
+ // compact borrowed data mid-merge, which might cause epochs to become
+ // out of sync across generations.
+ if (Generations && ForcedState == EForcedState::None && StatsPerTablet.contains(Backend->OwnerTabletId())) {
+ bool startForcedCompaction = NeedToStartForceCompaction();
+ if (startForcedCompaction) {
+ ForcedState = EForcedState::Pending;
+ ForcedGeneration = 1;
+ return true;
+ }
}
+
+ return false;
}
}
diff --git a/ydb/core/tablet_flat/flat_comp_gen.h b/ydb/core/tablet_flat/flat_comp_gen.h
index 79f7f689c60..07b39415492 100644
--- a/ydb/core/tablet_flat/flat_comp_gen.h
+++ b/ydb/core/tablet_flat/flat_comp_gen.h
@@ -1,6 +1,7 @@
#pragma once
#include "flat_comp.h"
+#include "flat_page_gstat.h"
#include <library/cpp/time_provider/time_provider.h>
@@ -33,6 +34,7 @@ namespace NCompGen {
void Stop() override;
void ReflectSchema() override;
+ void ReflectRemovedRowVersions() override;
void UpdateCompactions() override;
float GetOverloadFactor() override;
ui64 GetBackingSize() override;
@@ -158,7 +160,6 @@ namespace NCompGen {
const TLogoBlobID Label;
const TEpoch Epoch;
const TStats Stats;
- ui64 GarbageBytes = 0;
inline bool operator<(const TPartInfo& other) const {
if (other.Epoch != Epoch) {
@@ -243,6 +244,10 @@ namespace NCompGen {
return (ForcedState == EForcedState::Pending && ForcedGeneration == generation);
}
+ ui32 DroppedBytesPercent() const;
+ bool NeedToStartForceCompaction() const;
+ bool MaybeAutoStartForceCompaction();
+
private:
ui32 const Table;
ICompactionBackend* const Backend;
@@ -273,10 +278,7 @@ namespace NCompGen {
THashMap<TLogoBlobID, ui32> KnownParts;
TStats Stats;
THashMap<ui64, TStats> StatsPerTablet;
-
- TRowVersion CachedGarbageRowVersion = TRowVersion::Min();
- ui64 CachedGarbageBytes = 0;
- ui32 CachedDroppedBytesPercent = 0;
+ NPage::TGarbageStatsAgg GarbageStatsAgg;
};
}
diff --git a/ydb/core/tablet_flat/flat_comp_shard.cpp b/ydb/core/tablet_flat/flat_comp_shard.cpp
index b26139877dd..89d765129c4 100644
--- a/ydb/core/tablet_flat/flat_comp_shard.cpp
+++ b/ydb/core/tablet_flat/flat_comp_shard.cpp
@@ -1009,6 +1009,10 @@ namespace NCompShard {
CheckCompactions();
}
+ void TShardedCompactionStrategy::ReflectRemovedRowVersions() {
+ // nothing
+ }
+
void TShardedCompactionStrategy::UpdateCompactions() {
CheckCompactions();
}
diff --git a/ydb/core/tablet_flat/flat_comp_shard.h b/ydb/core/tablet_flat/flat_comp_shard.h
index 115e9ce8596..073a5f731ea 100644
--- a/ydb/core/tablet_flat/flat_comp_shard.h
+++ b/ydb/core/tablet_flat/flat_comp_shard.h
@@ -559,6 +559,7 @@ namespace NCompShard {
void Start(TCompactionState state) override;
void Stop() override;
void ReflectSchema() override;
+ void ReflectRemovedRowVersions() override;
void UpdateCompactions() override;
float GetOverloadFactor() override;
ui64 GetBackingSize() override;
diff --git a/ydb/core/tablet_flat/flat_cxx_database.h b/ydb/core/tablet_flat/flat_cxx_database.h
index 3dfbee66a47..5282500ecd3 100644
--- a/ydb/core/tablet_flat/flat_cxx_database.h
+++ b/ydb/core/tablet_flat/flat_cxx_database.h
@@ -1971,6 +1971,22 @@ struct Schema {
void Delete() {
Database->Update(TableId, NTable::ERowOp::Erase, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), { });
}
+
+ template <typename... ColumnTypes>
+ KeyOperations& UpdateV(const TRowVersion& rowVersion, const typename ColumnTypes::Type&... value) {
+ return UpdateV(rowVersion, TUpdate<ColumnTypes>(value)...);
+ }
+
+ template <typename... UpdateTypes>
+ KeyOperations& UpdateV(const TRowVersion& rowVersion, const UpdateTypes&... updates) {
+ std::array<TUpdateOp, sizeof...(UpdateTypes)> update_ops = {{updates...}};
+ Database->Update(TableId, NTable::ERowOp::Upsert, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), update_ops, rowVersion);
+ return *this;
+ }
+
+ void DeleteV(const TRowVersion& rowVersion) {
+ Database->Update(TableId, NTable::ERowOp::Erase, TTupleToRawTypeValue<KeyValuesType, KeyColumnsType>(KeyValues), { }, rowVersion);
+ }
};
};
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 7575afd7091..4f6b5de3d1a 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1959,6 +1959,8 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
for (auto& xpair : change->RemovedRowVersions) {
const auto tableId = xpair.first;
+ CompactionLogic->ReflectRemovedRowVersions(tableId);
+
NKikimrExecutorFlat::TTablePartSwitch proto;
proto.SetTableId(tableId);
diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
index 3d699e8f87b..b28c5214496 100644
--- a/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.cpp
@@ -266,6 +266,14 @@ TReflectSchemeChangesResult TCompactionLogic::ReflectSchemeChanges()
return result;
}
+void TCompactionLogic::ReflectRemovedRowVersions(ui32 table)
+{
+ auto *tableInfo = State->Tables.FindPtr(table);
+ if (tableInfo) {
+ tableInfo->Strategy->ReflectRemovedRowVersions();
+ }
+}
+
THolder<NTable::ICompactionStrategy> TCompactionLogic::CreateStrategy(
ui32 tableId,
NKikimrSchemeOp::ECompactionStrategy strategy)
diff --git a/ydb/core/tablet_flat/flat_executor_compaction_logic.h b/ydb/core/tablet_flat/flat_executor_compaction_logic.h
index b85a13a19ec..7993fdb83c0 100644
--- a/ydb/core/tablet_flat/flat_executor_compaction_logic.h
+++ b/ydb/core/tablet_flat/flat_executor_compaction_logic.h
@@ -218,6 +218,7 @@ public:
TFinishedCompactionInfo GetFinishedCompactionInfo(ui32 table);
TReflectSchemeChangesResult ReflectSchemeChanges();
+ void ReflectRemovedRowVersions(ui32 table);
void UpdateInMemStatsStep(ui32 table, ui32 steps, ui64 size);
void CheckInMemStats(ui32 table);
void UpdateLogUsage(TArrayRef<const NRedo::TUsage>);
diff --git a/ydb/core/tablet_flat/flat_page_gstat.h b/ydb/core/tablet_flat/flat_page_gstat.h
index 0ee555a3625..d5eff46141c 100644
--- a/ydb/core/tablet_flat/flat_page_gstat.h
+++ b/ydb/core/tablet_flat/flat_page_gstat.h
@@ -62,10 +62,18 @@ namespace NPage {
Items = { ptr, ptr + header->Items };
}
- ui64 Count() const {
+ size_t Count() const {
return Items.size();
}
+ TRowVersion GetRowVersionAtIndex(size_t index) const {
+ return Items[index].GetRowVersion();
+ }
+
+ ui64 GetGarbageBytesAtIndex(size_t index) const {
+ return Items[index].GetBytes();
+ }
+
/**
* Returns number of bytes that are guaranteed to be freed
* if everything up to rowVersion is marked as removed.
@@ -233,6 +241,152 @@ namespace NPage {
THeapByBytes ByBytes;
};
+ /**
+ * An aggregate across multiple TGarbageStats pages, reads are O(log N)
+ */
+ class TGarbageStatsAgg {
+ private:
+ friend class TGarbageStatsAggBuilder;
+
+ struct TItem {
+ ui64 Step;
+ ui64 TxId;
+ ui64 Bytes;
+
+ TRowVersion GetRowVersion() const {
+ return TRowVersion(Step, TxId);
+ }
+
+ ui64 GetBytes() const {
+ return Bytes;
+ }
+ };
+
+ public:
+ TGarbageStatsAgg() = default;
+
+ TGarbageStatsAgg(const TGarbageStatsAgg&) = delete;
+ TGarbageStatsAgg& operator=(const TGarbageStatsAgg&) = delete;
+
+ TGarbageStatsAgg(TGarbageStatsAgg&&) noexcept = default;
+ TGarbageStatsAgg& operator=(TGarbageStatsAgg&&) noexcept = default;
+
+ private:
+ TGarbageStatsAgg(TVector<TItem>&& items)
+ : Items(std::move(items))
+ { }
+
+ public:
+ /**
+ * Returns number of bytes that are guaranteed to be freed
+ * if everything up to rowVersion is marked as removed.
+ */
+ ui64 GetGarbageBytes(const TRowVersion& rowVersion) const {
+ if (Items.empty()) {
+ return 0;
+ }
+
+ auto cmp = [](const TRowVersion& rowVersion, const TItem& item) -> bool {
+ return rowVersion < item.GetRowVersion();
+ };
+
+ // First item with version > rowVersion
+ auto it = std::upper_bound(Items.begin(), Items.end(), rowVersion, cmp);
+ if (it == Items.begin()) {
+ return 0;
+ }
+
+ // First item with version <= rowVersion
+ return (--it)->GetBytes();
+ }
+
+ private:
+ TVector<TItem> Items;
+ };
+
+ class TGarbageStatsAggBuilder {
+ private:
+ using TItem = TGarbageStatsAgg::TItem;
+
+ public:
+ TGarbageStatsAggBuilder() = default;
+
+ /**
+ * Adds a delta bytes that is guaranteed to be freed when everything
+ * up to rowVersion is marked as removed.
+ */
+ void Add(const TRowVersion& rowVersion, ui64 bytes) {
+ Items.push_back(TItem{ rowVersion.Step, rowVersion.TxId, bytes });
+ }
+
+ /**
+ * Adds all data in the given garbage stats page to the aggregate.
+ */
+ void Add(const TGarbageStats* stats) {
+ ui64 prev = 0;
+ size_t count = stats->Count();
+ for (size_t index = 0; index < count; ++index) {
+ TRowVersion rowVersion = stats->GetRowVersionAtIndex(index);
+ ui64 bytes = stats->GetGarbageBytesAtIndex(index);
+ Add(rowVersion, bytes - prev);
+ prev = bytes;
+ }
+ }
+
+ /**
+ * Adds all data in the given garbage stats page to the aggregate.
+ */
+ void Add(const TIntrusiveConstPtr<TGarbageStats>& stats) {
+ Add(stats.Get());
+ }
+
+ /**
+ * Builds an aggregate object, destroying builder in the process
+ */
+ TGarbageStatsAgg Build() {
+ if (!Items.empty()) {
+ auto cmp = [](const TItem& a, const TItem& b) -> bool {
+ return a.GetRowVersion() < b.GetRowVersion();
+ };
+
+ // Sort data by row version
+ std::sort(Items.begin(), Items.end(), cmp);
+
+ // Compute aggregate bytes
+ ui64 bytes = 0;
+ auto dst = Items.begin();
+ for (auto src = Items.begin(); src != Items.end(); ++src) {
+ bytes += src->GetBytes();
+ if (dst != Items.begin()) {
+ auto last = std::prev(dst);
+ if (last->GetRowVersion() == src->GetRowVersion()) {
+ last->Bytes = bytes;
+ continue;
+ }
+ }
+ if (dst != src) {
+ *dst = *src;
+ }
+ dst->Bytes = bytes;
+ ++dst;
+ }
+
+ if (dst != Items.end()) {
+ Items.erase(dst, Items.end());
+ }
+
+ Items.shrink_to_fit();
+ }
+
+ TGarbageStatsAgg agg(std::move(Items));
+ Items.clear();
+ return agg;
+ }
+
+ private:
+ TVector<TItem> Items;
+ };
+
} // namespace NPage
} // namespace NTable
} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/test/libs/table/test_comp.h b/ydb/core/tablet_flat/test/libs/table/test_comp.h
index 1f6d83b29f2..8e3bf7fa1f0 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_comp.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_comp.h
@@ -4,6 +4,7 @@
#include "test_writer.h"
#include <ydb/core/tablet_flat/util_basics.h>
+#include <ydb/core/tablet_flat/flat_row_versions.h>
#include <ydb/core/tablet_flat/flat_table_subset.h>
#include <ydb/core/tablet_flat/flat_scan_feed.h>
#include <ydb/core/tablet_flat/test/libs/rows/layout.h>
@@ -47,6 +48,11 @@ namespace NTest {
}
+ TCompaction& WithRemovedRowVersions(const TRowVersionRanges& ranges) {
+ RemovedRowVersions = ranges.Snapshot();
+ return *this;
+ }
+
TPartEggs Do(TIntrusiveConstPtr<TMemTable> table)
{
return Do(TSubset(TEpoch::Zero(), table->Scheme, { TMemTableSnapshot{ table, table->Immediate() } }));
@@ -164,6 +170,10 @@ namespace NTest {
EScan Feed(const TRow &row, TRowVersion &rowVersion) noexcept override
{
+ if (RemovedRowVersions) {
+ rowVersion = RemovedRowVersions.AdjustDown(rowVersion);
+ }
+
Writer->AddKeyVersion(row, rowVersion);
return Failed = 0, EScan::Feed;
@@ -194,6 +204,7 @@ namespace NTest {
TAutoPtr<IPages> Env;
TVector<ui32> Tags;
TAutoPtr<TPartWriter> Writer;
+ TRowVersionRanges::TSnapshot RemovedRowVersions;
};
}
diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
index 06bede14c9b..b0bd726c6cc 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
@@ -197,7 +197,9 @@ namespace NTest {
TAutoPtr<IPages> env = new TForwardEnv(128, 256, keys, Max<ui32>());
- auto eggs = TCompaction(env, conf).Do(*subset, logo);
+ auto eggs = TCompaction(env, conf)
+ .WithRemovedRowVersions(Base->GetRemovedRowVersions(table))
+ .Do(*subset, logo);
Y_VERIFY(!eggs.NoResult(), "Unexpected early termination");
diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
index ab7dfb8dde4..b772b186332 100644
--- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
+++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
@@ -40,7 +40,7 @@ public:
}
ui64 OwnerTabletId() const override {
- return 123;
+ return TabletId;
}
const TScheme& DatabaseScheme() override {
@@ -181,9 +181,11 @@ public:
TAutoPtr<IPages> env = new TTestEnv;
// Template for new blobs
- TLogoBlobID logo(123, Gen, ++Step, 0, 0, 0);
+ TLogoBlobID logo(TabletId, Gen, ++Step, 0, 0, 0);
- auto eggs = TCompaction(env, conf).Do(*subset, logo);
+ auto eggs = TCompaction(env, conf)
+ .WithRemovedRowVersions(DB.GetRemovedRowVersions(params->Table))
+ .Do(*subset, logo);
TVector<TPartView> parts(Reserve(eggs.Parts.size()));
for (auto& part : eggs.Parts) {
@@ -312,7 +314,7 @@ private:
void SwitchGen() {
++Gen;
Step = 0;
- Annex.Reset(new NPageCollection::TSteppedCookieAllocator(123, ui64(Gen) << 32, { 0, 999 }, {{ 1, 7 }}));
+ Annex.Reset(new NPageCollection::TSteppedCookieAllocator(TabletId, ui64(Gen) << 32, { 0, 999 }, {{ 1, 7 }}));
}
public:
@@ -321,6 +323,7 @@ public:
THashMap<ui64, THolder<ICompactionRead>> PendingReads;
THashMap<ui64, THolder<TCompactionParams>> StartedCompactions;
THashMap<ui32, THashMap<ui64, TString>> TableState;
+ ui64 TabletId = 123;
private:
THolder<NPageCollection::TSteppedCookieAllocator> Annex;
diff --git a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
index de2a3caf07f..d9d5c194a0f 100644
--- a/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
+++ b/ydb/core/tablet_flat/ut/ut_comp_gen.cpp
@@ -485,6 +485,328 @@ Y_UNIT_TEST_SUITE(TGenCompaction) {
UNIT_ASSERT_VALUES_EQUAL(strategy.GetLastFinishedForcedCompactionTs(), TInstant::Seconds(60));
}
+ Y_UNIT_TEST(ForcedCompactionByUnreachableMvccData) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+ ui64 performedCompactions = 0;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.DroppedRowsPercentToCompact = 50;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ for (auto& gen : policy.Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 0;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ }
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows at v1
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42);
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+
+ // Delete all rows at v2
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2));
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u);
+
+ // We expect a forced compaction to be pending right now
+ UNIT_ASSERT(!strategy.AllowForcedCompaction());
+ UNIT_ASSERT(broker.HasPending());
+
+ // Finish forced compactions
+ while (broker.HasPending()) {
+ UNIT_ASSERT(broker.RunPending());
+ if (backend.StartedCompactions.empty())
+ continue;
+
+ UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions");
+ auto result = backend.RunCompaction();
+ auto changes = strategy.CompactionFinished(
+ result.CompactionId, std::move(result.Params), std::move(result.Result));
+ backend.ApplyChanges(Table, std::move(changes));
+ }
+
+ // Everything should be compacted to a single part (erased data still visible)
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u);
+
+ // Delete all versions from minimum up to almost v2
+ {
+ backend.Begin();
+ backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 1));
+ backend.Commit();
+ }
+
+ // Notify strategy about removed row versions change
+ strategy.ReflectRemovedRowVersions();
+
+ // Nothing should be pending at this time, because all data is still visible
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT(!broker.HasPending());
+
+ // Delete all versions from almost v2 up to v2
+ {
+ backend.Begin();
+ backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion(2, 1), TRowVersion(2, 2));
+ backend.Commit();
+ }
+
+ // Notify strategy about removed row versions change
+ strategy.ReflectRemovedRowVersions();
+
+ // We expect a forced compaction to be pending right now
+ UNIT_ASSERT(!strategy.AllowForcedCompaction());
+ UNIT_ASSERT(broker.HasPending());
+
+ // Finish forced compactions
+ while (broker.HasPending()) {
+ UNIT_ASSERT(broker.RunPending());
+ if (backend.StartedCompactions.empty())
+ continue;
+
+ UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions");
+ auto result = backend.RunCompaction();
+ auto changes = strategy.CompactionFinished(
+ result.CompactionId, std::move(result.Params), std::move(result.Result));
+ backend.ApplyChanges(Table, std::move(changes));
+ }
+
+ // Table should become completely empty
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 0u);
+ }
+
+ Y_UNIT_TEST(ForcedCompactionByUnreachableMvccDataRestart) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+ ui64 performedCompactions = 0;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.DroppedRowsPercentToCompact = 50;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ for (auto& gen : policy.Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 0;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ }
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows at v1
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42);
+ }
+ backend.Commit();
+ }
+
+ // Delete all rows at v2
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2));
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u);
+
+ // We expect nothing to be pending right now
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT(!broker.HasPending());
+
+ strategy.Stop();
+
+ // Delete all versions from minimum up to v2
+ {
+ backend.Begin();
+ backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 2));
+ backend.Commit();
+ }
+
+ // Start a new strategy
+ TGenCompactionStrategy strategy2(Table, &backend, &broker, &time, "suffix");
+ strategy2.Start({ });
+
+ // We expect a forced compaction to be pending right now
+ UNIT_ASSERT(!strategy2.AllowForcedCompaction());
+ UNIT_ASSERT(broker.HasPending());
+
+ // Finish forced compactions
+ while (broker.HasPending()) {
+ UNIT_ASSERT(broker.RunPending());
+ if (backend.StartedCompactions.empty())
+ continue;
+
+ UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions");
+ auto result = backend.RunCompaction();
+ auto changes = strategy2.CompactionFinished(
+ result.CompactionId, std::move(result.Params), std::move(result.Result));
+ backend.ApplyChanges(Table, std::move(changes));
+ }
+
+ // Table should become completely empty
+ UNIT_ASSERT(strategy2.AllowForcedCompaction());
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 0u);
+ }
+
+ Y_UNIT_TEST(ForcedCompactionByUnreachableMvccDataBorrowed) {
+ TSimpleBackend backend;
+ TSimpleBroker broker;
+ TSimpleLogger logger;
+ TSimpleTime time;
+ ui64 performedCompactions = 0;
+
+ // Initialize the schema
+ {
+ auto db = backend.Begin();
+ db.Materialize<Schema>();
+
+ TCompactionPolicy policy;
+ policy.DroppedRowsPercentToCompact = 50;
+ policy.Generations.emplace_back(10 * 1024 * 1024, 2, 10, 100 * 1024 * 1024, "compact_gen1", true);
+ policy.Generations.emplace_back(100 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen2", true);
+ policy.Generations.emplace_back(200 * 1024 * 1024, 2, 10, 200 * 1024 * 1024, "compact_gen3", true);
+ for (auto& gen : policy.Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 0;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ }
+ backend.DB.Alter().SetCompactionPolicy(Table, policy);
+
+ backend.Commit();
+ }
+
+ TGenCompactionStrategy strategy(Table, &backend, &broker, &time, "suffix");
+ strategy.Start({ });
+
+ // Insert some rows at v1
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(1, 1), 42);
+ }
+ backend.Commit();
+ }
+
+ // Delete all rows at v2
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 0; key < 16; ++key) {
+ db.Table<Schema::Data>().Key(key).DeleteV(TRowVersion(2, 2));
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy);
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u);
+
+ // We expect nothing to be pending right now
+ UNIT_ASSERT(strategy.AllowForcedCompaction());
+ UNIT_ASSERT(!broker.HasPending());
+
+ strategy.Stop();
+
+ // Delete all versions from minimum up to v2
+ {
+ backend.Begin();
+ backend.DB.RemoveRowVersions(Schema::Data::TableId, TRowVersion::Min(), TRowVersion(2, 2));
+ backend.Commit();
+ }
+
+ // Change tablet id so all data would be treated as borrowed
+ backend.TabletId++;
+
+ // Start a new strategy
+ TGenCompactionStrategy strategy2(Table, &backend, &broker, &time, "suffix");
+ strategy2.Start({ });
+
+ // We expect nothing to be pending right now
+ UNIT_ASSERT(strategy2.AllowForcedCompaction());
+ UNIT_ASSERT(!broker.HasPending());
+
+ // Insert a single row at v3
+ {
+ auto db = backend.Begin();
+ for (ui64 key = 16; key < 17; ++key) {
+ db.Table<Schema::Data>().Key(key).UpdateV<Schema::Data::Value>(TRowVersion(3, 3), 42);
+ }
+ backend.Commit();
+ }
+
+ backend.SimpleMemCompaction(&strategy2);
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 2u);
+
+ // We expect a forced compaction to be pending right now
+ UNIT_ASSERT(!strategy2.AllowForcedCompaction());
+ UNIT_ASSERT(broker.HasPending());
+
+ // Finish forced compactions
+ while (broker.HasPending()) {
+ UNIT_ASSERT(broker.RunPending());
+ if (backend.StartedCompactions.empty())
+ continue;
+
+ UNIT_ASSERT_C(performedCompactions++ < 100, "too many compactions");
+ auto result = backend.RunCompaction();
+ auto changes = strategy2.CompactionFinished(
+ result.CompactionId, std::move(result.Params), std::move(result.Result));
+ backend.ApplyChanges(Table, std::move(changes));
+ }
+
+ // Table should be left with a single sst
+ UNIT_ASSERT(strategy2.AllowForcedCompaction());
+ UNIT_ASSERT_VALUES_EQUAL(backend.TableParts(Table).size(), 1u);
+ }
+
};
} // NCompGen