aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2025-02-18 20:26:27 +0300
committerGitHub <noreply@github.com>2025-02-18 20:26:27 +0300
commiteb1eed1cc26be0126ffacee14074898a2ad2b4b2 (patch)
treea3ca6675cf73db27c68835de3b9933be1e3dc8f3
parente31ed432d1637866af870abf13b08bd3610f5ebc (diff)
downloadydb-eb1eed1cc26be0126ffacee14074898a2ad2b4b2.tar.gz
Avoid persistent and in-memory tx status getting out-of-sync (#14704)
-rw-r--r--ydb/core/tablet_flat/flat_boot_stages.h3
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp27
-rw-r--r--ydb/core/tablet_flat/flat_database.h11
-rw-r--r--ydb/core/tablet_flat/flat_dbase_naked.h31
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp35
-rw-r--r--ydb/core/tablet_flat/flat_executor_misc.h7
-rw-r--r--ydb/core/tablet_flat/flat_executor_ut.cpp94
-rw-r--r--ydb/core/tablet_flat/flat_mem_warm.h11
-rw-r--r--ydb/core/tablet_flat/flat_ops_compact.h53
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp297
-rw-r--r--ydb/core/tablet_flat/flat_table.h38
-rw-r--r--ydb/core/tablet_flat/flat_table_subset.h1
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_dbase.h7
-rw-r--r--ydb/core/tablet_flat/ut/flat_comp_ut_common.h8
14 files changed, 481 insertions, 142 deletions
diff --git a/ydb/core/tablet_flat/flat_boot_stages.h b/ydb/core/tablet_flat/flat_boot_stages.h
index 8fde38a5e7..d796970800 100644
--- a/ydb/core/tablet_flat/flat_boot_stages.h
+++ b/ydb/core/tablet_flat/flat_boot_stages.h
@@ -194,6 +194,9 @@ namespace NBoot {
const auto was = Back->DatabaseImpl->Rewind(Back->Serial);
+ // Notify database that all merges have completed
+ Back->DatabaseImpl->MergeDone();
+
result.Database = new NTable::TDatabase(Back->DatabaseImpl.Release());
if (auto logl = Env->Logger()->Log(ELnLev::Info)) {
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index acc5bd968a..68bc5b416b 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -572,9 +572,14 @@ TEpoch TDatabase::TxSnapTable(ui32 table)
return DatabaseImpl->FlushTable(table);
}
-TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const
+TAutoPtr<TSubset> TDatabase::CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const
{
- return Require(table)->Subset(bundle, before);
+ return Require(table)->CompactionSubset(before, bundle);
+}
+
+TAutoPtr<TSubset> TDatabase::PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const
+{
+ return Require(table)->PartSwitchSubset(before, bundle, txStatus);
}
TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const
@@ -617,14 +622,13 @@ void TDatabase::ReplaceSlices(ui32 table, TBundleSlicesMap slices)
return DatabaseImpl->ReplaceSlices(table, std::move(slices));
}
-void TDatabase::Replace(ui32 table, TArrayRef<const TPartView> partViews, const TSubset &subset)
+void TDatabase::Replace(
+ ui32 table,
+ const TSubset& subset,
+ TArrayRef<const TPartView> newParts,
+ TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus)
{
- return DatabaseImpl->Replace(table, partViews, subset);
-}
-
-void TDatabase::ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset)
-{
- return DatabaseImpl->ReplaceTxStatus(table, txStatus, subset);
+ return DatabaseImpl->Replace(table, subset, newParts, newTxStatus);
}
void TDatabase::Merge(ui32 table, TPartView partView)
@@ -642,6 +646,11 @@ void TDatabase::Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart> txStatus)
return DatabaseImpl->Merge(table, std::move(txStatus));
}
+void TDatabase::MergeDone(ui32 table)
+{
+ return DatabaseImpl->MergeDone(table);
+}
+
TAlter& TDatabase::Alter()
{
Y_ABORT_UNLESS(Redo, "Scheme change must be done within a transaction");
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 373e50f371..9f086987ea 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -226,7 +226,8 @@ public:
void UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel);
TString SnapshotToLog(ui32 table, TTxStamp);
- TAutoPtr<TSubset> Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const;
+ TAutoPtr<TSubset> CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const;
+ TAutoPtr<TSubset> PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const;
TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const;
TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max());
@@ -235,11 +236,15 @@ public:
TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const;
void ReplaceSlices(ui32 table, TBundleSlicesMap slices);
- void Replace(ui32 table, TArrayRef<const TPartView>, const TSubset&);
- void ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&);
+ void Replace(
+ ui32 table,
+ const TSubset&,
+ TArrayRef<const TPartView>,
+ TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>);
void Merge(ui32 table, TPartView);
void Merge(ui32 table, TIntrusiveConstPtr<TColdPart>);
void Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart>);
+ void MergeDone(ui32 table);
void DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
void DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
diff --git a/ydb/core/tablet_flat/flat_dbase_naked.h b/ydb/core/tablet_flat/flat_dbase_naked.h
index 043fd7a386..339dba9656 100644
--- a/ydb/core/tablet_flat/flat_dbase_naked.h
+++ b/ydb/core/tablet_flat/flat_dbase_naked.h
@@ -481,51 +481,62 @@ namespace NTable {
wrap.Aggr(Stats, true /* enter */);
}
- void Replace(ui32 tid, TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept
+ void Replace(
+ ui32 tid,
+ const TSubset &subset,
+ TArrayRef<const TPartView> newParts,
+ TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) noexcept
{
auto &wrap = Get(tid, true);
wrap.Aggr(Stats, false /* leave */);
- wrap->Replace(partViews, subset);
+ wrap->Replace(subset, newParts, newTxStatus);
wrap.Aggr(Stats, true /* enter */);
}
- void ReplaceTxStatus(ui32 tid, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset) noexcept
+ void Merge(ui32 tid, TPartView partView) noexcept
{
auto &wrap = Get(tid, true);
wrap.Aggr(Stats, false /* leave */);
- wrap->ReplaceTxStatus(txStatus, subset);
+ wrap->Merge(std::move(partView));
wrap.Aggr(Stats, true /* enter */);
}
- void Merge(ui32 tid, TPartView partView) noexcept
+ void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept
{
auto &wrap = Get(tid, true);
wrap.Aggr(Stats, false /* leave */);
- wrap->Merge(std::move(partView));
+ wrap->Merge(std::move(part));
wrap.Aggr(Stats, true /* enter */);
}
- void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept
+ void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
{
auto &wrap = Get(tid, true);
wrap.Aggr(Stats, false /* leave */);
- wrap->Merge(std::move(part));
+ wrap->Merge(std::move(txStatus));
wrap.Aggr(Stats, true /* enter */);
}
- void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
+ void MergeDone(ui32 tid) noexcept
{
auto &wrap = Get(tid, true);
wrap.Aggr(Stats, false /* leave */);
- wrap->Merge(std::move(txStatus));
+ wrap->MergeDone();
wrap.Aggr(Stats, true /* enter */);
}
+ void MergeDone() noexcept
+ {
+ for (auto &pr : Tables) {
+ MergeDone(pr.first);
+ }
+ }
+
bool ApplySchema(const TSchemeChanges &delta)
{
TModifier modifier(*Scheme);
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 9caaa1bfac..4c4109064b 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1479,7 +1479,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
if (partSwitch.FollowerUpdateStep) {
- auto subset = Database->Subset(partSwitch.TableId, partSwitch.Leaving, partSwitch.Head);
+ auto subset = Database->PartSwitchSubset(partSwitch.TableId, partSwitch.Head, partSwitch.Leaving, partSwitch.LeavingTxStatus);
if (partSwitch.Head != subset->Head) {
Y_ABORT("Follower table epoch head has diverged from leader");
@@ -1488,16 +1488,17 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
Y_ABORT_UNLESS(newColdParts.empty(), "Unexpected cold part at a follower");
- Database->Replace(partSwitch.TableId, std::move(newParts), *subset);
- Database->ReplaceTxStatus(partSwitch.TableId, std::move(newTxStatus), *subset);
+ Database->Replace(partSwitch.TableId, *subset, std::move(newParts), std::move(newTxStatus));
for (auto &gone : subset->Flatten)
DropCachesOfBundle(*gone);
Send(Owner->Tablet(), new TEvTablet::TEvFGcAck(Owner->TabletID(), Generation(), partSwitch.FollowerUpdateStep));
} else {
+ bool merged = false;
for (auto &partView : newParts) {
Database->Merge(partSwitch.TableId, partView);
+ merged = true;
if (CompactionLogic) {
CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(partView));
@@ -1505,6 +1506,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
for (auto &part : newColdParts) {
Database->Merge(partSwitch.TableId, part);
+ merged = true;
if (CompactionLogic) {
CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(part));
@@ -1512,6 +1514,10 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
for (auto &txStatus : newTxStatus) {
Database->Merge(partSwitch.TableId, txStatus);
+ merged = true;
+ }
+ if (merged) {
+ Database->MergeDone(partSwitch.TableId);
}
}
@@ -1533,7 +1539,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
// N.B. there should be a single source table per part switch
for (auto& [sourceTable, state] : perTable) {
// Rebase source parts to their respective new epochs
- auto srcSubset = Database->Subset(sourceTable, state.Bundles, NTable::TEpoch::Zero());
+ auto srcSubset = Database->PartSwitchSubset(sourceTable, NTable::TEpoch::Zero(), state.Bundles, { });
TVector<NTable::TPartView> rebased(Reserve(srcSubset->Flatten.size()));
for (const auto& partView : srcSubset->Flatten) {
Y_ABORT_UNLESS(!partView->TxIdStats, "Cannot move parts with uncommitted deltas");
@@ -1542,7 +1548,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
// Remove source parts from the source table
- Database->Replace(sourceTable, { }, *srcSubset);
+ Database->Replace(sourceTable, *srcSubset, { }, { });
if (CompactionLogic) {
CompactionLogic->RemovedParts(sourceTable, state.Bundles);
@@ -1557,6 +1563,8 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) {
}
}
}
+
+ Database->MergeDone(partSwitch.TableId);
}
}
@@ -2310,7 +2318,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
}
// Remove source parts from the source table
- Database->Replace(src, { }, *srcSubset);
+ Database->Replace(src, *srcSubset, { }, { });
const auto logicResult = CompactionLogic->RemovedParts(src, labels);
@@ -2342,6 +2350,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
Database->Merge(dst, partView);
CompactionLogic->BorrowedPart(dst, partView);
}
+ Database->MergeDone(dst);
// Serialize rebased parts as moved from the source table
NKikimrExecutorFlat::TTablePartSwitch proto;
@@ -3050,7 +3059,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
TAutoPtr<NTable::TSubset> subset;
if (params) {
- subset = Database->Subset(table, { }, params->Edge.Head);
+ subset = Database->CompactionSubset(table, params->Edge.Head, { });
if (params->Parts) {
subset->Flatten.insert(subset->Flatten.end(), params->Parts.begin(), params->Parts.end());
@@ -3397,8 +3406,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)
newParts.emplace_back(result.Part);
}
- Database->Replace(tableId, newParts, *ops->Subset);
- Database->ReplaceTxStatus(tableId, newTxStatus, *ops->Subset);
+ Database->Replace(tableId, *ops->Subset, newParts, newTxStatus);
TVector<TLogoBlobID> bundles(Reserve(ops->Subset->Flatten.size() + ops->Subset->ColdParts.size()));
for (auto &part: ops->Subset->Flatten) {
@@ -4525,23 +4533,28 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params)
if (!memTableSnapshot->GetCommittedTransactions().empty() || !memTableSnapshot->GetRemovedTransactions().empty()) {
// We must compact tx status when mem table has changes
compactTxStatus = true;
+ break;
}
}
for (const auto& txStatus : snapshot->Subset->TxStatus) {
if (txStatus->Label.TabletID() != Owner->TabletID()) {
// We want to compact borrowed tx status
compactTxStatus = true;
+ break;
}
}
+ if (snapshot->Subset->TxStatus && snapshot->Subset->GarbageTransactions) {
+ // We want to remove garbage transactions
+ compactTxStatus = true;
+ }
if (compactTxStatus) {
- comp->CommittedTransactions = snapshot->Subset->CommittedTransactions;
- comp->RemovedTransactions = snapshot->Subset->RemovedTransactions;
comp->Frozen.reserve(snapshot->Subset->Frozen.size());
for (auto& memTableSnapshot : snapshot->Subset->Frozen) {
comp->Frozen.push_back(memTableSnapshot.MemTable);
}
comp->TxStatus = snapshot->Subset->TxStatus;
+ comp->GarbageTransactions = snapshot->Subset->GarbageTransactions;
} else {
// We are not compacting tx status, avoid deleting current blobs
snapshot->Subset->TxStatus.clear();
diff --git a/ydb/core/tablet_flat/flat_executor_misc.h b/ydb/core/tablet_flat/flat_executor_misc.h
index 4f92584078..bca5f23c64 100644
--- a/ydb/core/tablet_flat/flat_executor_misc.h
+++ b/ydb/core/tablet_flat/flat_executor_misc.h
@@ -34,12 +34,11 @@ namespace NTabletFlatExecutor {
THolder<NTable::TCompactionParams> Params;
NTable::TRowVersionRanges::TSnapshot RemovedRowVersions;
- // Non-empty when compaction also needs to write a tx status table part
- NTable::TTransactionMap CommittedTransactions;
- NTable::TTransactionSet RemovedTransactions;
- // The above may contain extra keys, these allow them to be narrowed
+ // Non-empty when compaction also needs to produce a tx status table part
TVector<TIntrusiveConstPtr<NTable::TMemTable>> Frozen;
TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus;
+ // Non-empty for transactions that no longer need their status maintained
+ NTable::TTransactionSet GarbageTransactions;
};
}
diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp
index 0df3e2b59e..ac8ba17da0 100644
--- a/ydb/core/tablet_flat/flat_executor_ut.cpp
+++ b/ydb/core/tablet_flat/flat_executor_ut.cpp
@@ -3984,6 +3984,18 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) {
const TIntrusiveConstPtr<TCompactionPolicy> Policy;
};
+ struct TTxWaitCompleted : public ITransaction {
+ TTxWaitCompleted() = default;
+
+ bool Execute(TTransactionContext&, const TActorContext&) override {
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+ };
+
struct TTxCommitLongTx : public ITransaction {
ui64 TxId;
TRowVersion RowVersion;
@@ -4004,6 +4016,26 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) {
}
};
+ struct TTxHasTxData : public ITransaction {
+ const ui64 TxId;
+ bool& Result;
+
+ explicit TTxHasTxData(ui64 txId, bool& result)
+ : TxId(txId)
+ , Result(result)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ Result = txc.DB.HasTxData(TableId, TxId);
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+ };
+
template<ui32 ColumnId>
struct TTxWriteRow : public ITransaction {
i64 Key;
@@ -4712,6 +4744,68 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) {
}
}
+ Y_UNIT_TEST(CompactedTxIdReuse) {
+ TMyEnvBase env;
+
+ env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
+ env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+
+ env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new TTestFlatTablet(env.Edge, tablet, info);
+ });
+ env.WaitForWakeUp();
+
+ env.SendSync(new NFake::TEvExecute{ new TTxInitSchema });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(1, "foo", 123) });
+ env.SendSync(new NFake::TEvExecute{ new TTxCommitLongTx(123) });
+ env.SendSync(new NFake::TEvExecute{ new TTxWaitCompleted() });
+
+ bool hasTxData = false;
+ env.SendSync(new NFake::TEvExecute{ new TTxHasTxData(123, hasTxData) });
+ UNIT_ASSERT_C(hasTxData, "Expected HasTxData = true before compaction");
+
+ // Compact until executor tells us txId no longer has any data
+ for (int i = 0; i < 2 && hasTxData; ++i) {
+ // Force a mem table compaction
+ Cerr << "...compacting" << Endl;
+ env.SendSync(new NFake::TEvCompact(TableId, true));
+ Cerr << "...waiting until compacted" << Endl;
+ env.WaitFor<NFake::TEvCompacted>();
+ env.SendSync(new NFake::TEvExecute{ new TTxHasTxData(123, hasTxData) });
+ Cerr << "...hasTxData = " << hasTxData << Endl;
+ }
+ UNIT_ASSERT_C(!hasTxData, "Expected HasTxData = false after multiple compactions");
+
+ // Add an additional row
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(2, "bar", 123) });
+ env.SendSync(new NFake::TEvExecute{ new TTxWaitCompleted() });
+
+ // We must not see the uncommitted row
+ {
+ TString data;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) });
+ UNIT_ASSERT_VALUES_EQUAL(data,
+ "Key 1 = Upsert value = Set foo value2 = Empty NULL\n");
+ }
+
+ Cerr << "...restarting tablet" << Endl;
+ env.SendSync(new TEvents::TEvPoison, false, true);
+ env.WaitForGone();
+ env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new TTestFlatTablet(env.Edge, tablet, info);
+ });
+ env.WaitForWakeUp();
+
+ // We shouldn't see uncommitted row after a reboot as well
+ {
+ Cerr << "... checking rows" << Endl;
+ TString data;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) }, /* retry */ true);
+ UNIT_ASSERT_VALUES_EQUAL(data,
+ "Key 1 = Upsert value = Set foo value2 = Empty NULL\n");
+ }
+ }
+
}
Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTxAndBlobs) {
diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h
index 0abcf84acd..b12efed491 100644
--- a/ydb/core/tablet_flat/flat_mem_warm.h
+++ b/ydb/core/tablet_flat/flat_mem_warm.h
@@ -460,7 +460,8 @@ namespace NMem {
return TxIdStats;
}
- void CommitTx(ui64 txId, TRowVersion rowVersion) {
+ bool CommitTx(ui64 txId, TRowVersion rowVersion) {
+ bool newRef = false;
auto it = Committed.find(txId);
bool toInsert = (it == Committed.end());
@@ -480,12 +481,16 @@ namespace NMem {
UndoBuffer.push_back(TUndoOpInsertRemoved{ txId });
}
Removed.erase(itRemoved);
+ } else {
+ newRef = true;
}
}
}
+ return newRef;
}
- void RemoveTx(ui64 txId) {
+ bool RemoveTx(ui64 txId) {
+ bool newRef = false;
auto it = Committed.find(txId);
if (it == Committed.end()) {
auto itRemoved = Removed.find(txId);
@@ -494,8 +499,10 @@ namespace NMem {
UndoBuffer.push_back(TUndoOpEraseRemoved{ txId });
}
Removed.insert(txId);
+ newRef = true;
}
}
+ return newRef;
}
const absl::flat_hash_map<ui64, TRowVersion>& GetCommittedTransactions() const {
diff --git a/ydb/core/tablet_flat/flat_ops_compact.h b/ydb/core/tablet_flat/flat_ops_compact.h
index ce7be59a0c..a2adf6a025 100644
--- a/ydb/core/tablet_flat/flat_ops_compact.h
+++ b/ydb/core/tablet_flat/flat_ops_compact.h
@@ -255,40 +255,61 @@ namespace NTabletFlatExecutor {
void WriteTxStatus() noexcept
{
- if (!Conf->CommittedTransactions && !Conf->RemovedTransactions) {
- // Nothing to write
- return;
+ if (!Conf->Frozen && !Conf->TxStatus) {
+ // Nothing to compact
}
- THashSet<ui64> txFilter;
+ absl::flat_hash_map<ui64, std::optional<TRowVersion>> status;
+ auto mergeStatus = [&](ui64 txId, const std::optional<TRowVersion>& version) {
+ if (Conf->GarbageTransactions.Contains(txId)) {
+ // We don't write garbage transactions
+ return;
+ }
+ auto it = status.find(txId);
+ if (it == status.end()) {
+ status[txId] = version;
+ } else if (version) {
+ if (!it->second) {
+ // commit wins over remove
+ it->second = version;
+ } else if (*version < *it->second) {
+ // lowest commit version wins
+ it->second = version;
+ }
+ }
+ };
+
for (const auto& memTable : Conf->Frozen) {
for (const auto& pr : memTable->GetCommittedTransactions()) {
- txFilter.insert(pr.first);
+ mergeStatus(pr.first, pr.second);
}
for (const ui64 txId : memTable->GetRemovedTransactions()) {
- txFilter.insert(txId);
+ mergeStatus(txId, std::nullopt);
}
}
for (const auto& txStatus : Conf->TxStatus) {
for (const auto& item : txStatus->TxStatusPage->GetCommittedItems()) {
- txFilter.insert(item.GetTxId());
+ mergeStatus(item.GetTxId(), item.GetRowVersion());
}
for (const auto& item : txStatus->TxStatusPage->GetRemovedItems()) {
- txFilter.insert(item.GetTxId());
+ mergeStatus(item.GetTxId(), std::nullopt);
}
}
- NTable::NPage::TTxStatusBuilder builder;
- for (const auto& pr : Conf->CommittedTransactions) {
- if (txFilter.contains(pr.first)) {
- builder.AddCommitted(pr.first, pr.second);
- }
+ if (status.empty()) {
+ // Nothing to write
+ return;
}
- for (const ui64 txId : Conf->RemovedTransactions) {
- if (txFilter.contains(txId)) {
- builder.AddRemoved(txId);
+
+ NTable::NPage::TTxStatusBuilder builder;
+ for (const auto& pr : status) {
+ if (pr.second) {
+ builder.AddCommitted(pr.first, *pr.second);
+ } else {
+ builder.AddRemoved(pr.first);
}
}
+
auto data = builder.Finish();
if (!data) {
// Don't write an empty page
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index d0acb0baf1..18519041ec 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -49,14 +49,18 @@ void TTable::RollbackChanges()
struct TApplyRollbackOp {
TTable* Self;
- void operator()(const TRollbackRemoveTxRef& op) const {
- auto it = Self->TxRefs.find(op.TxId);
- Y_ABORT_UNLESS(it != Self->TxRefs.end());
+ void operator()(const TRollbackRemoveTxDataRef& op) const {
+ auto it = Self->TxDataRefs.find(op.TxId);
+ Y_ABORT_UNLESS(it != Self->TxDataRefs.end());
if (0 == --it->second) {
- Self->TxRefs.erase(it);
+ Self->TxDataRefs.erase(it);
}
}
+ void operator()(const TRollbackRemoveTxStatusRef& op) const {
+ Self->RemoveTxStatusRef(op.TxId);
+ }
+
void operator()(const TRollbackAddCommittedTx& op) const {
Self->CommittedTransactions.Add(op.TxId, op.RowVersion);
}
@@ -210,7 +214,7 @@ TIntrusiveConstPtr<TRowScheme> TTable::GetScheme() const noexcept
return Scheme;
}
-TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch head)
+TAutoPtr<TSubset> TTable::CompactionSubset(TEpoch head, TArrayRef<const TLogoBlobID> bundle)
{
head = Min(head, Epoch);
@@ -245,6 +249,49 @@ TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch hea
subset->CommittedTransactions = CommittedTransactions;
subset->RemovedTransactions = RemovedTransactions;
+ subset->GarbageTransactions = GarbageTransactions;
+
+ return subset;
+}
+
+TAutoPtr<TSubset> TTable::PartSwitchSubset(TEpoch head, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus)
+{
+ head = Min(head, Epoch);
+
+ TAutoPtr<TSubset> subset = new TSubset(head, Scheme);
+
+ if (head > TEpoch::Zero()) {
+ for (auto &x : Frozen) {
+ if (x->Epoch < head) {
+ subset->Frozen.emplace_back(x, x->Immediate());
+ }
+ }
+ if (MutableBackup && MutableBackup->Epoch < head) {
+ subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate());
+ }
+ }
+
+ subset->Flatten.reserve(bundle.size());
+ for (const TLogoBlobID &token : bundle) {
+ if (auto* c = ColdParts.FindPtr(token)) {
+ subset->ColdParts.push_back(*c);
+ continue;
+ }
+ auto* p = Flatten.FindPtr(token);
+ Y_VERIFY_S(p, "Cannot find part " << token);
+ subset->Flatten.push_back(*p);
+ }
+
+ subset->TxStatus.reserve(txStatus.size());
+ for (const TLogoBlobID &token : txStatus) {
+ auto* p = TxStatus.FindPtr(token);
+ Y_VERIFY_S(p, "Cannot find tx status " << token);
+ subset->TxStatus.push_back(*p);
+ }
+
+ subset->CommittedTransactions = CommittedTransactions;
+ subset->RemovedTransactions = RemovedTransactions;
+ subset->GarbageTransactions = GarbageTransactions;
return subset;
}
@@ -281,6 +328,7 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept
// However it can still theoretically be used for iteration or compaction
subset->CommittedTransactions = CommittedTransactions;
subset->RemovedTransactions = RemovedTransactions;
+ subset->GarbageTransactions = GarbageTransactions;
return subset;
}
@@ -346,8 +394,7 @@ TAutoPtr<TSubset> TTable::Unwrap() noexcept
auto subset = Subset(TEpoch::Max());
- Replace({ }, *subset);
- ReplaceTxStatus({ }, *subset);
+ Replace(*subset, { }, { });
Y_ABORT_UNLESS(!(Flatten || Frozen || Mutable || TxStatus));
@@ -384,11 +431,14 @@ void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept
}
}
-void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept
+void TTable::Replace(
+ const TSubset& subset,
+ TArrayRef<const TPartView> newParts,
+ TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) noexcept
{
Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction");
- for (const auto &partView : partViews) {
+ for (const auto& partView : newParts) {
Y_ABORT_UNLESS(partView, "Replace(...) shouldn't get empty parts");
Y_ABORT_UNLESS(!partView.Screen, "Replace(...) shouldn't get screened parts");
Y_ABORT_UNLESS(partView.Slices && *partView.Slices, "Got parts without slices");
@@ -403,9 +453,13 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
bool removingOld = false;
bool addingNew = false;
- THashSet<ui64> checkNewTransactions;
- for (auto &memTable : subset.Frozen) {
+ // Note: we remove old parts first and add new ones next
+ // Refcount cannot become zero more than once so vectors are unique
+ std::vector<ui64> checkTxDataRefs;
+ std::vector<ui64> checkTxStatusRefs;
+
+ for (auto& memTable : subset.Frozen) {
removingOld = true;
const auto found = Frozen.erase(memTable.MemTable);
@@ -418,10 +472,27 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
for (const auto &pr : memTable.MemTable->GetTxIdStats()) {
const ui64 txId = pr.first;
- auto& count = TxRefs.at(txId);
+ auto& count = TxDataRefs.at(txId);
Y_ABORT_UNLESS(count > 0);
if (0 == --count) {
- checkNewTransactions.insert(txId);
+ checkTxDataRefs.push_back(txId);
+ }
+ }
+
+ for (const auto &pr : memTable.MemTable->GetCommittedTransactions()) {
+ const ui64 txId = pr.first;
+ auto& count = TxStatusRefs.at(txId);
+ Y_ABORT_UNLESS(count > 0);
+ if (0 == --count) {
+ checkTxStatusRefs.push_back(txId);
+ }
+ }
+
+ for (ui64 txId : memTable.MemTable->GetRemovedTransactions()) {
+ auto& count = TxStatusRefs.at(txId);
+ Y_ABORT_UNLESS(count > 0);
+ if (0 == --count) {
+ checkTxStatusRefs.push_back(txId);
}
}
}
@@ -458,10 +529,10 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
if (existing->TxIdStats) {
for (const auto& item : existing->TxIdStats->GetItems()) {
const ui64 txId = item.GetTxId();
- auto& count = TxRefs.at(txId);
+ auto& count = TxDataRefs.at(txId);
Y_ABORT_UNLESS(count > 0);
if (0 == --count) {
- checkNewTransactions.insert(txId);
+ checkTxDataRefs.push_back(txId);
}
}
}
@@ -482,7 +553,33 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
ColdParts.erase(it);
}
- for (const auto &partView : partViews) {
+ for (auto& part : subset.TxStatus) {
+ removingOld = true;
+ Y_ABORT_UNLESS(part, "Unexpected empty TTxStatusPart in TSubset");
+
+ auto it = TxStatus.find(part->Label);
+ Y_ABORT_UNLESS(it != TxStatus.end());
+ TxStatus.erase(it);
+
+ for (auto& item : part->TxStatusPage->GetCommittedItems()) {
+ const ui64 txId = item.GetTxId();
+ auto& count = TxStatusRefs.at(txId);
+ Y_ABORT_UNLESS(count > 0);
+ if (0 == --count) {
+ checkTxStatusRefs.push_back(txId);
+ }
+ }
+ for (auto& item : part->TxStatusPage->GetRemovedItems()) {
+ const ui64 txId = item.GetTxId();
+ auto& count = TxStatusRefs.at(txId);
+ Y_ABORT_UNLESS(count > 0);
+ if (0 == --count) {
+ checkTxStatusRefs.push_back(txId);
+ }
+ }
+ }
+
+ for (const auto &partView : newParts) {
addingNew = true;
if (Mutable && partView->Epoch >= Mutable->Epoch) {
Y_Fail("Replace with " << NFmt::Do(*partView) << " after mutable epoch " << Mutable->Epoch);
@@ -497,20 +594,59 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
AddSafe(partView);
}
- for (ui64 txId : checkNewTransactions) {
- auto it = TxRefs.find(txId);
- Y_ABORT_UNLESS(it != TxRefs.end());
+ for (const auto& txStatus : newTxStatus) {
+ if (Mutable && txStatus->Epoch >= Mutable->Epoch) {
+ Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after mutable epoch " << Mutable->Epoch);
+ }
+
+ if (Frozen && txStatus->Epoch >= (*Frozen.begin())->Epoch) {
+ Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after frozen epoch " << (*Frozen.begin())->Epoch);
+ }
+
+ Epoch = Max(Epoch, txStatus->Epoch + 1);
+
+ auto res = TxStatus.emplace(txStatus->Label, txStatus);
+ Y_ABORT_UNLESS(res.second, "Unexpected failure to add a new TTxStatusPart");
+
+ for (auto& item : txStatus->TxStatusPage->GetCommittedItems()) {
+ const ui64 txId = item.GetTxId();
+ ++TxStatusRefs[txId];
+ }
+ for (auto& item : txStatus->TxStatusPage->GetRemovedItems()) {
+ const ui64 txId = item.GetTxId();
+ ++TxStatusRefs[txId];
+ }
+ }
+
+ for (ui64 txId : checkTxDataRefs) {
+ auto it = TxDataRefs.find(txId);
+ Y_ABORT_UNLESS(it != TxDataRefs.end());
if (it->second == 0) {
- // Transaction no longer needs to be tracked
+ // Transaction no longer has any known rows
+ TxDataRefs.erase(it);
+ OpenTxs.erase(txId);
if (!ColdParts) {
- CommittedTransactions.Remove(txId);
- RemovedTransactions.Remove(txId);
- DecidedTransactions.Remove(txId);
+ GarbageTransactions.Add(txId);
} else {
CheckTransactions.insert(txId);
}
- TxRefs.erase(it);
- OpenTxs.erase(txId);
+ }
+ }
+
+ for (ui64 txId : checkTxStatusRefs) {
+ auto it = TxStatusRefs.find(txId);
+ Y_ABORT_UNLESS(it != TxStatusRefs.end());
+ if (it->second == 0) {
+ // This transaction no longer has any known status
+ TxStatusRefs.erase(it);
+ CommittedTransactions.Remove(txId);
+ RemovedTransactions.Remove(txId);
+ DecidedTransactions.Remove(txId);
+ GarbageTransactions.Remove(txId);
+ if (TxDataRefs.contains(txId)) {
+ // In the unlikely case it has some data it is now open
+ OpenTxs.insert(txId);
+ }
}
}
@@ -526,34 +662,6 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
}
}
-void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus, const TSubset &subset) noexcept
-{
- Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction");
-
- for (auto &part : subset.TxStatus) {
- Y_ABORT_UNLESS(part, "Unexpected empty TTxStatusPart in TSubset");
-
- auto it = TxStatus.find(part->Label);
- Y_ABORT_UNLESS(it != TxStatus.end());
- TxStatus.erase(it);
- }
-
- for (const auto& txStatus : newTxStatus) {
- if (Mutable && txStatus->Epoch >= Mutable->Epoch) {
- Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after mutable epoch " << Mutable->Epoch);
- }
-
- if (Frozen && txStatus->Epoch >= (*Frozen.begin())->Epoch) {
- Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after frozen epoch " << (*Frozen.begin())->Epoch);
- }
-
- Epoch = Max(Epoch, txStatus->Epoch + 1);
-
- auto res = TxStatus.emplace(txStatus->Label, txStatus);
- Y_ABORT_UNLESS(res.second, "Unexpected failure to add a new TTxStatusPart");
- }
-}
-
void TTable::Merge(TPartView partView) noexcept
{
Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction");
@@ -639,9 +747,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
}
}
}
- if (!TxRefs.contains(txId)) {
+ if (!TxDataRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
+ ++TxStatusRefs[txId];
DecidedTransactions.Add(txId);
OpenTxs.erase(txId);
}
@@ -654,9 +763,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
// This is not an error in some cases, but may be suspicious
RemovedCommittedTxs++;
}
- if (!TxRefs.contains(txId)) {
+ if (!TxDataRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
+ ++TxStatusRefs[txId];
DecidedTransactions.Add(txId);
OpenTxs.erase(txId);
}
@@ -679,15 +789,18 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
// eventuality, so doesn't need to be invalidated.
}
+void TTable::MergeDone() noexcept
+{
+ ProcessCheckTransactions();
+}
+
void TTable::ProcessCheckTransactions() noexcept
{
if (!ColdParts) {
for (ui64 txId : CheckTransactions) {
- auto it = TxRefs.find(txId);
- if (it == TxRefs.end()) {
- CommittedTransactions.Remove(txId);
- RemovedTransactions.Remove(txId);
- DecidedTransactions.Remove(txId);
+ auto it = TxDataRefs.find(txId);
+ if (it == TxDataRefs.end()) {
+ GarbageTransactions.Add(txId);
}
}
CheckTransactions.clear();
@@ -784,8 +897,8 @@ void TTable::AddSafe(TPartView partView)
if (partView->TxIdStats) {
for (const auto& item : partView->TxIdStats->GetItems()) {
const ui64 txId = item.GetTxId();
- const auto newCount = ++TxRefs[txId];
- if (newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) {
+ const auto newCount = ++TxDataRefs[txId];
+ if (newCount == 1 && !TxStatusRefs.contains(txId)) {
OpenTxs.insert(txId);
}
}
@@ -885,10 +998,10 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG
}
}
-void TTable::AddTxRef(ui64 txId)
+void TTable::AddTxDataRef(ui64 txId)
{
- const auto newCount = ++TxRefs[txId];
- const bool addOpenTx = newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId);
+ const auto newCount = ++TxDataRefs[txId];
+ const bool addOpenTx = newCount == 1 && !TxStatusRefs.contains(txId);
if (addOpenTx) {
auto res = OpenTxs.insert(txId);
Y_ABORT_UNLESS(res.second);
@@ -896,17 +1009,47 @@ void TTable::AddTxRef(ui64 txId)
"Decided transaction %" PRIu64 " is both open and decided", txId);
}
if (RollbackState) {
- RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId });
+ RollbackOps.emplace_back(TRollbackRemoveTxDataRef{ txId });
if (addOpenTx) {
RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId });
}
}
}
+void TTable::AddTxStatusRef(ui64 txId)
+{
+ ++TxStatusRefs[txId];
+ if (RollbackState) {
+ RollbackOps.emplace_back(TRollbackRemoveTxStatusRef{ txId });
+ }
+}
+
+void TTable::RemoveTxStatusRef(ui64 txId)
+{
+ auto it = TxStatusRefs.find(txId);
+ Y_ABORT_UNLESS(it != TxStatusRefs.end());
+ Y_ABORT_UNLESS(it->second > 0);
+ if (0 == --it->second) {
+ // This was the last reference
+ TxStatusRefs.erase(it);
+ // Remove the corresponding committed/removed record
+ CommittedTransactions.Remove(txId);
+ RemovedTransactions.Remove(txId);
+ Y_DEBUG_ABORT_UNLESS(!GarbageTransactions.Contains(txId),
+ "Garbage transaction %" PRIu64 " has no status", txId);
+ Y_DEBUG_ABORT_UNLESS(!DecidedTransactions.Contains(txId),
+ "Decided transaction %" PRIu64 " has no status", txId);
+ // Transactions with data but without tx status are open
+ if (TxDataRefs.contains(txId)) {
+ OpenTxs.insert(txId);
+ }
+ }
+}
+
void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId)
{
auto& memTable = MemTable();
- bool hadTxRef = memTable.GetTxIdStats().contains(txId);
+ bool hadTxDataRef = memTable.GetTxIdStats().contains(txId);
if (ErasedKeysCache) {
const TCelled cells(key, *Scheme->Keys, true);
@@ -920,11 +1063,11 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe
TRowVersion rowVersion(Max<ui64>(), txId);
MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
- if (!hadTxRef) {
+ if (!hadTxDataRef) {
Y_DEBUG_ABORT_UNLESS(memTable.GetTxIdStats().contains(txId));
- AddTxRef(txId);
+ AddTxDataRef(txId);
} else {
- Y_DEBUG_ABORT_UNLESS(TxRefs[txId] > 0);
+ Y_DEBUG_ABORT_UNLESS(TxDataRefs[txId] > 0);
}
if (TableObserver) {
@@ -935,7 +1078,9 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe
void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
{
// TODO: track suspicious transactions (not open at commit time)
- MemTable().CommitTx(txId, rowVersion);
+ if (MemTable().CommitTx(txId, rowVersion)) {
+ AddTxStatusRef(txId);
+ }
// Note: it is possible to have multiple CommitTx for the same TxId but at
// different row versions. The commit with the minimum row version wins.
@@ -975,7 +1120,9 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
void TTable::RemoveTx(ui64 txId)
{
// TODO: track suspicious transactions (not open at remove time)
- MemTable().RemoveTx(txId);
+ if (MemTable().RemoveTx(txId)) {
+ AddTxStatusRef(txId);
+ }
// Note: it is possible to have both CommitTx and RemoveTx for the same TxId
// due to complicated split/merge shard interactions. The commit actually
@@ -1008,7 +1155,7 @@ bool TTable::HasOpenTx(ui64 txId) const
bool TTable::HasTxData(ui64 txId) const
{
- return TxRefs.contains(txId);
+ return TxDataRefs.contains(txId) || TxStatusRefs.contains(txId);
}
bool TTable::HasCommittedTx(ui64 txId) const
@@ -1033,7 +1180,7 @@ size_t TTable::GetOpenTxCount() const
size_t TTable::GetTxsWithDataCount() const
{
- return TxRefs.size();
+ return TxDataRefs.size();
}
size_t TTable::GetCommittedTxCount() const
@@ -1050,7 +1197,7 @@ TTableRuntimeStats TTable::RuntimeStats() const noexcept
{
return TTableRuntimeStats{
.OpenTxCount = OpenTxs.size(),
- .TxsWithDataCount = TxRefs.size(),
+ .TxsWithDataCount = TxDataRefs.size() + GarbageTransactions.Size(),
.CommittedTxCount = CommittedTransactions.Size(),
.RemovedTxCount = RemovedTransactions.Size(),
.RemovedCommittedTxs = RemovedCommittedTxs,
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 493a5ed40d..405ab7a109 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -84,7 +84,8 @@ public:
return Epoch;
}
- TAutoPtr<TSubset> Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch edge);
+ TAutoPtr<TSubset> CompactionSubset(TEpoch edge, TArrayRef<const TLogoBlobID> bundle);
+ TAutoPtr<TSubset> PartSwitchSubset(TEpoch edge, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus);
TAutoPtr<TSubset> Subset(TEpoch edge) const noexcept;
TAutoPtr<TSubset> ScanSnapshot(TRowVersion snapshot = TRowVersion::Max()) noexcept;
TAutoPtr<TSubset> Unwrap() noexcept; /* full Subset(..) + final Replace(..) */
@@ -110,8 +111,7 @@ public:
be displaced from table with Clean() method eventually.
*/
- void Replace(TArrayRef<const TPartView>, const TSubset&) noexcept;
- void ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&) noexcept;
+ void Replace(const TSubset&, TArrayRef<const TPartView>, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>) noexcept;
/*_ Special interface for clonig flatten part of table for outer usage.
Cook some TPartView with Subset(...) method and/or TShrink tool first and
@@ -121,6 +121,7 @@ public:
void Merge(TPartView partView) noexcept;
void Merge(TIntrusiveConstPtr<TColdPart> part) noexcept;
void Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept;
+ void MergeDone() noexcept;
void ProcessCheckTransactions() noexcept;
/**
@@ -339,7 +340,9 @@ private:
void RemoveStat(const TPartView& partView);
private:
- void AddTxRef(ui64 txId);
+ void AddTxDataRef(ui64 txId);
+ void AddTxStatusRef(ui64 txId);
+ void RemoveTxStatusRef(ui64 txId);
private:
TEpoch Epoch; /* Monotonic table change number, with holes */
@@ -361,18 +364,38 @@ private:
TRowVersionRanges RemovedRowVersions;
- absl::flat_hash_map<ui64, size_t> TxRefs;
+ // The number of entities (memtable/sst) that have rows with a TxId. As
+ // long as there is at least one row with a TxId its commit/remove status
+ // must be preserved.
+ absl::flat_hash_map<ui64, size_t> TxDataRefs;
+
+ // The number of entities (memtable/txstatus) that have a commit/remove
+ // status for a TxId. As long as there is at least one such entity the
+ // transaction cannot be used again without artifacts, and must stay
+ // in committed/removed set.
+ absl::flat_hash_map<ui64, size_t> TxStatusRefs;
+
+ // A set of open transactions, i.e. transactions that have rows with the
+ // specified TxId and that have not been committed or removed yet.
absl::flat_hash_set<ui64> OpenTxs;
+
+ // A set of transactions that need to be re-checked after a merge.
absl::flat_hash_set<ui64> CheckTransactions;
+
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
TTransactionSet DecidedTransactions;
+ TTransactionSet GarbageTransactions;
TIntrusivePtr<ITableObserver> TableObserver;
ui64 RemovedCommittedTxs = 0;
private:
- struct TRollbackRemoveTxRef {
+ struct TRollbackRemoveTxDataRef {
+ ui64 TxId;
+ };
+
+ struct TRollbackRemoveTxStatusRef {
ui64 TxId;
};
@@ -402,7 +425,8 @@ private:
};
using TRollbackOp = std::variant<
- TRollbackRemoveTxRef,
+ TRollbackRemoveTxDataRef,
+ TRollbackRemoveTxStatusRef,
TRollbackAddCommittedTx,
TRollbackRemoveCommittedTx,
TRollbackAddRemovedTx,
diff --git a/ydb/core/tablet_flat/flat_table_subset.h b/ydb/core/tablet_flat/flat_table_subset.h
index 0c7355abfa..fca40ac93b 100644
--- a/ydb/core/tablet_flat/flat_table_subset.h
+++ b/ydb/core/tablet_flat/flat_table_subset.h
@@ -118,6 +118,7 @@ namespace NTable {
TVector<TIntrusiveConstPtr<TColdPart>> ColdParts;
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
+ TTransactionSet GarbageTransactions;
TVector<TIntrusiveConstPtr<TTxStatusPart>> TxStatus;
};
diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
index 5cfe807b00..47d3526b4d 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
@@ -246,9 +246,12 @@ namespace NTest {
if (last /* make full subset */) {
subset = Base->Subset(table, TEpoch::Max(), { }, { });
} else /* only flush memtables */ {
- subset = Base->Subset(table, { }, TEpoch::Max());
+ subset = Base->CompactionSubset(table, TEpoch::Max(), { });
}
+ // Note: we don't compact TxStatus in these tests
+ Y_ABORT_UNLESS(subset->TxStatus.empty());
+
TLogoBlobID logo(1, Gen, ++Step, 1, 0, 0);
auto *family = Base->GetScheme().DefaultFamilyFor(table);
@@ -279,7 +282,7 @@ namespace NTest {
for (auto &part : eggs.Parts)
partViews.push_back({ part, nullptr, part->Slices });
- Base->Replace(table, std::move(partViews), *subset);
+ Base->Replace(table, *subset, std::move(partViews), { });
return *this;
}
diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
index 96b97e01f6..be28e08680 100644
--- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
+++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h
@@ -125,11 +125,14 @@ public:
SnapshotTable(params->Table);
}
- auto subset = DB.Subset(params->Table, { }, params->Edge.Head);
+ auto subset = DB.CompactionSubset(params->Table, params->Edge.Head, { });
if (params->Parts) {
subset->Flatten.insert(subset->Flatten.end(), params->Parts.begin(), params->Parts.end());
}
+ // Note: we don't compact TxStatus in these tests
+ Y_ABORT_UNLESS(subset->TxStatus.empty());
+
Y_ABORT_UNLESS(!*subset || subset->IsStickedToHead());
const auto& scheme = DB.GetScheme();
@@ -163,8 +166,7 @@ public:
Y_ABORT_UNLESS(parts.back());
}
- auto partsCopy = parts;
- DB.Replace(params->Table, partsCopy, *subset);
+ DB.Replace(params->Table, *subset, parts, { });
return MakeHolder<TCompactionResult>(subset->Epoch(), std::move(parts));
}