diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2025-02-18 20:26:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-18 20:26:27 +0300 |
commit | eb1eed1cc26be0126ffacee14074898a2ad2b4b2 (patch) | |
tree | a3ca6675cf73db27c68835de3b9933be1e3dc8f3 | |
parent | e31ed432d1637866af870abf13b08bd3610f5ebc (diff) | |
download | ydb-eb1eed1cc26be0126ffacee14074898a2ad2b4b2.tar.gz |
Avoid persistent and in-memory tx status getting out-of-sync (#14704)
-rw-r--r-- | ydb/core/tablet_flat/flat_boot_stages.h | 3 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_dbase_naked.h | 31 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 35 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_misc.h | 7 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_ut.cpp | 94 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_mem_warm.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_ops_compact.h | 53 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 297 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 38 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table_subset.h | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_dbase.h | 7 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/flat_comp_ut_common.h | 8 |
14 files changed, 481 insertions, 142 deletions
diff --git a/ydb/core/tablet_flat/flat_boot_stages.h b/ydb/core/tablet_flat/flat_boot_stages.h index 8fde38a5e7..d796970800 100644 --- a/ydb/core/tablet_flat/flat_boot_stages.h +++ b/ydb/core/tablet_flat/flat_boot_stages.h @@ -194,6 +194,9 @@ namespace NBoot { const auto was = Back->DatabaseImpl->Rewind(Back->Serial); + // Notify database that all merges have completed + Back->DatabaseImpl->MergeDone(); + result.Database = new NTable::TDatabase(Back->DatabaseImpl.Release()); if (auto logl = Env->Logger()->Log(ELnLev::Info)) { diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index acc5bd968a..68bc5b416b 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -572,9 +572,14 @@ TEpoch TDatabase::TxSnapTable(ui32 table) return DatabaseImpl->FlushTable(table); } -TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const +TAutoPtr<TSubset> TDatabase::CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const { - return Require(table)->Subset(bundle, before); + return Require(table)->CompactionSubset(before, bundle); +} + +TAutoPtr<TSubset> TDatabase::PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const +{ + return Require(table)->PartSwitchSubset(before, bundle, txStatus); } TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const @@ -617,14 +622,13 @@ void TDatabase::ReplaceSlices(ui32 table, TBundleSlicesMap slices) return DatabaseImpl->ReplaceSlices(table, std::move(slices)); } -void TDatabase::Replace(ui32 table, TArrayRef<const TPartView> partViews, const TSubset &subset) +void TDatabase::Replace( + ui32 table, + const TSubset& subset, + TArrayRef<const TPartView> newParts, + TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) { - return DatabaseImpl->Replace(table, partViews, subset); -} - -void TDatabase::ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset) -{ - return DatabaseImpl->ReplaceTxStatus(table, txStatus, subset); + return DatabaseImpl->Replace(table, subset, newParts, newTxStatus); } void TDatabase::Merge(ui32 table, TPartView partView) @@ -642,6 +646,11 @@ void TDatabase::Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart> txStatus) return DatabaseImpl->Merge(table, std::move(txStatus)); } +void TDatabase::MergeDone(ui32 table) +{ + return DatabaseImpl->MergeDone(table); +} + TAlter& TDatabase::Alter() { Y_ABORT_UNLESS(Redo, "Scheme change must be done within a transaction"); diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 373e50f371..9f086987ea 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -226,7 +226,8 @@ public: void UpdateApproximateFreeSharesByChannel(const THashMap<ui32, float>& approximateFreeSpaceShareByChannel); TString SnapshotToLog(ui32 table, TTxStamp); - TAutoPtr<TSubset> Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const; + TAutoPtr<TSubset> CompactionSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle) const; + TAutoPtr<TSubset> PartSwitchSubset(ui32 table, TEpoch before, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) const; TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const; TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max()); @@ -235,11 +236,15 @@ public: TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const; void ReplaceSlices(ui32 table, TBundleSlicesMap slices); - void Replace(ui32 table, TArrayRef<const TPartView>, const TSubset&); - void ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&); + void Replace( + ui32 table, + const TSubset&, + TArrayRef<const TPartView>, + TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>); void Merge(ui32 table, TPartView); void Merge(ui32 table, TIntrusiveConstPtr<TColdPart>); void Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart>); + void MergeDone(ui32 table); void DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const; void DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const; diff --git a/ydb/core/tablet_flat/flat_dbase_naked.h b/ydb/core/tablet_flat/flat_dbase_naked.h index 043fd7a386..339dba9656 100644 --- a/ydb/core/tablet_flat/flat_dbase_naked.h +++ b/ydb/core/tablet_flat/flat_dbase_naked.h @@ -481,51 +481,62 @@ namespace NTable { wrap.Aggr(Stats, true /* enter */); } - void Replace(ui32 tid, TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept + void Replace( + ui32 tid, + const TSubset &subset, + TArrayRef<const TPartView> newParts, + TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) noexcept { auto &wrap = Get(tid, true); wrap.Aggr(Stats, false /* leave */); - wrap->Replace(partViews, subset); + wrap->Replace(subset, newParts, newTxStatus); wrap.Aggr(Stats, true /* enter */); } - void ReplaceTxStatus(ui32 tid, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> txStatus, const TSubset &subset) noexcept + void Merge(ui32 tid, TPartView partView) noexcept { auto &wrap = Get(tid, true); wrap.Aggr(Stats, false /* leave */); - wrap->ReplaceTxStatus(txStatus, subset); + wrap->Merge(std::move(partView)); wrap.Aggr(Stats, true /* enter */); } - void Merge(ui32 tid, TPartView partView) noexcept + void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept { auto &wrap = Get(tid, true); wrap.Aggr(Stats, false /* leave */); - wrap->Merge(std::move(partView)); + wrap->Merge(std::move(part)); wrap.Aggr(Stats, true /* enter */); } - void Merge(ui32 tid, TIntrusiveConstPtr<TColdPart> part) noexcept + void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept { auto &wrap = Get(tid, true); wrap.Aggr(Stats, false /* leave */); - wrap->Merge(std::move(part)); + wrap->Merge(std::move(txStatus)); wrap.Aggr(Stats, true /* enter */); } - void Merge(ui32 tid, TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept + void MergeDone(ui32 tid) noexcept { auto &wrap = Get(tid, true); wrap.Aggr(Stats, false /* leave */); - wrap->Merge(std::move(txStatus)); + wrap->MergeDone(); wrap.Aggr(Stats, true /* enter */); } + void MergeDone() noexcept + { + for (auto &pr : Tables) { + MergeDone(pr.first); + } + } + bool ApplySchema(const TSchemeChanges &delta) { TModifier modifier(*Scheme); diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 9caaa1bfac..4c4109064b 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1479,7 +1479,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } if (partSwitch.FollowerUpdateStep) { - auto subset = Database->Subset(partSwitch.TableId, partSwitch.Leaving, partSwitch.Head); + auto subset = Database->PartSwitchSubset(partSwitch.TableId, partSwitch.Head, partSwitch.Leaving, partSwitch.LeavingTxStatus); if (partSwitch.Head != subset->Head) { Y_ABORT("Follower table epoch head has diverged from leader"); @@ -1488,16 +1488,17 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } Y_ABORT_UNLESS(newColdParts.empty(), "Unexpected cold part at a follower"); - Database->Replace(partSwitch.TableId, std::move(newParts), *subset); - Database->ReplaceTxStatus(partSwitch.TableId, std::move(newTxStatus), *subset); + Database->Replace(partSwitch.TableId, *subset, std::move(newParts), std::move(newTxStatus)); for (auto &gone : subset->Flatten) DropCachesOfBundle(*gone); Send(Owner->Tablet(), new TEvTablet::TEvFGcAck(Owner->TabletID(), Generation(), partSwitch.FollowerUpdateStep)); } else { + bool merged = false; for (auto &partView : newParts) { Database->Merge(partSwitch.TableId, partView); + merged = true; if (CompactionLogic) { CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(partView)); @@ -1505,6 +1506,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } for (auto &part : newColdParts) { Database->Merge(partSwitch.TableId, part); + merged = true; if (CompactionLogic) { CompactionLogic->BorrowedPart(partSwitch.TableId, std::move(part)); @@ -1512,6 +1514,10 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } for (auto &txStatus : newTxStatus) { Database->Merge(partSwitch.TableId, txStatus); + merged = true; + } + if (merged) { + Database->MergeDone(partSwitch.TableId); } } @@ -1533,7 +1539,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { // N.B. there should be a single source table per part switch for (auto& [sourceTable, state] : perTable) { // Rebase source parts to their respective new epochs - auto srcSubset = Database->Subset(sourceTable, state.Bundles, NTable::TEpoch::Zero()); + auto srcSubset = Database->PartSwitchSubset(sourceTable, NTable::TEpoch::Zero(), state.Bundles, { }); TVector<NTable::TPartView> rebased(Reserve(srcSubset->Flatten.size())); for (const auto& partView : srcSubset->Flatten) { Y_ABORT_UNLESS(!partView->TxIdStats, "Cannot move parts with uncommitted deltas"); @@ -1542,7 +1548,7 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } // Remove source parts from the source table - Database->Replace(sourceTable, { }, *srcSubset); + Database->Replace(sourceTable, *srcSubset, { }, { }); if (CompactionLogic) { CompactionLogic->RemovedParts(sourceTable, state.Bundles); @@ -1557,6 +1563,8 @@ void TExecutor::ApplyExternalPartSwitch(TPendingPartSwitch &partSwitch) { } } } + + Database->MergeDone(partSwitch.TableId); } } @@ -2310,7 +2318,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv } // Remove source parts from the source table - Database->Replace(src, { }, *srcSubset); + Database->Replace(src, *srcSubset, { }, { }); const auto logicResult = CompactionLogic->RemovedParts(src, labels); @@ -2342,6 +2350,7 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv Database->Merge(dst, partView); CompactionLogic->BorrowedPart(dst, partView); } + Database->MergeDone(dst); // Serialize rebased parts as moved from the source table NKikimrExecutorFlat::TTablePartSwitch proto; @@ -3050,7 +3059,7 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable:: TAutoPtr<NTable::TSubset> subset; if (params) { - subset = Database->Subset(table, { }, params->Edge.Head); + subset = Database->CompactionSubset(table, params->Edge.Head, { }); if (params->Parts) { subset->Flatten.insert(subset->Flatten.end(), params->Parts.begin(), params->Parts.end()); @@ -3397,8 +3406,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled) newParts.emplace_back(result.Part); } - Database->Replace(tableId, newParts, *ops->Subset); - Database->ReplaceTxStatus(tableId, newTxStatus, *ops->Subset); + Database->Replace(tableId, *ops->Subset, newParts, newTxStatus); TVector<TLogoBlobID> bundles(Reserve(ops->Subset->Flatten.size() + ops->Subset->ColdParts.size())); for (auto &part: ops->Subset->Flatten) { @@ -4525,23 +4533,28 @@ ui64 TExecutor::BeginCompaction(THolder<NTable::TCompactionParams> params) if (!memTableSnapshot->GetCommittedTransactions().empty() || !memTableSnapshot->GetRemovedTransactions().empty()) { // We must compact tx status when mem table has changes compactTxStatus = true; + break; } } for (const auto& txStatus : snapshot->Subset->TxStatus) { if (txStatus->Label.TabletID() != Owner->TabletID()) { // We want to compact borrowed tx status compactTxStatus = true; + break; } } + if (snapshot->Subset->TxStatus && snapshot->Subset->GarbageTransactions) { + // We want to remove garbage transactions + compactTxStatus = true; + } if (compactTxStatus) { - comp->CommittedTransactions = snapshot->Subset->CommittedTransactions; - comp->RemovedTransactions = snapshot->Subset->RemovedTransactions; comp->Frozen.reserve(snapshot->Subset->Frozen.size()); for (auto& memTableSnapshot : snapshot->Subset->Frozen) { comp->Frozen.push_back(memTableSnapshot.MemTable); } comp->TxStatus = snapshot->Subset->TxStatus; + comp->GarbageTransactions = snapshot->Subset->GarbageTransactions; } else { // We are not compacting tx status, avoid deleting current blobs snapshot->Subset->TxStatus.clear(); diff --git a/ydb/core/tablet_flat/flat_executor_misc.h b/ydb/core/tablet_flat/flat_executor_misc.h index 4f92584078..bca5f23c64 100644 --- a/ydb/core/tablet_flat/flat_executor_misc.h +++ b/ydb/core/tablet_flat/flat_executor_misc.h @@ -34,12 +34,11 @@ namespace NTabletFlatExecutor { THolder<NTable::TCompactionParams> Params; NTable::TRowVersionRanges::TSnapshot RemovedRowVersions; - // Non-empty when compaction also needs to write a tx status table part - NTable::TTransactionMap CommittedTransactions; - NTable::TTransactionSet RemovedTransactions; - // The above may contain extra keys, these allow them to be narrowed + // Non-empty when compaction also needs to produce a tx status table part TVector<TIntrusiveConstPtr<NTable::TMemTable>> Frozen; TVector<TIntrusiveConstPtr<NTable::TTxStatusPart>> TxStatus; + // Non-empty for transactions that no longer need their status maintained + NTable::TTransactionSet GarbageTransactions; }; } diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index 0df3e2b59e..ac8ba17da0 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -3984,6 +3984,18 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) { const TIntrusiveConstPtr<TCompactionPolicy> Policy; }; + struct TTxWaitCompleted : public ITransaction { + TTxWaitCompleted() = default; + + bool Execute(TTransactionContext&, const TActorContext&) override { + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } + }; + struct TTxCommitLongTx : public ITransaction { ui64 TxId; TRowVersion RowVersion; @@ -4004,6 +4016,26 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) { } }; + struct TTxHasTxData : public ITransaction { + const ui64 TxId; + bool& Result; + + explicit TTxHasTxData(ui64 txId, bool& result) + : TxId(txId) + , Result(result) + {} + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + Result = txc.DB.HasTxData(TableId, TxId); + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + }; + template<ui32 ColumnId> struct TTxWriteRow : public ITransaction { i64 Key; @@ -4712,6 +4744,68 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTx) { } } + Y_UNIT_TEST(CompactedTxIdReuse) { + TMyEnvBase env; + + env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG); + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { + return new TTestFlatTablet(env.Edge, tablet, info); + }); + env.WaitForWakeUp(); + + env.SendSync(new NFake::TEvExecute{ new TTxInitSchema }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(1, "foo", 123) }); + env.SendSync(new NFake::TEvExecute{ new TTxCommitLongTx(123) }); + env.SendSync(new NFake::TEvExecute{ new TTxWaitCompleted() }); + + bool hasTxData = false; + env.SendSync(new NFake::TEvExecute{ new TTxHasTxData(123, hasTxData) }); + UNIT_ASSERT_C(hasTxData, "Expected HasTxData = true before compaction"); + + // Compact until executor tells us txId no longer has any data + for (int i = 0; i < 2 && hasTxData; ++i) { + // Force a mem table compaction + Cerr << "...compacting" << Endl; + env.SendSync(new NFake::TEvCompact(TableId, true)); + Cerr << "...waiting until compacted" << Endl; + env.WaitFor<NFake::TEvCompacted>(); + env.SendSync(new NFake::TEvExecute{ new TTxHasTxData(123, hasTxData) }); + Cerr << "...hasTxData = " << hasTxData << Endl; + } + UNIT_ASSERT_C(!hasTxData, "Expected HasTxData = false after multiple compactions"); + + // Add an additional row + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(2, "bar", 123) }); + env.SendSync(new NFake::TEvExecute{ new TTxWaitCompleted() }); + + // We must not see the uncommitted row + { + TString data; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) }); + UNIT_ASSERT_VALUES_EQUAL(data, + "Key 1 = Upsert value = Set foo value2 = Empty NULL\n"); + } + + Cerr << "...restarting tablet" << Endl; + env.SendSync(new TEvents::TEvPoison, false, true); + env.WaitForGone(); + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { + return new TTestFlatTablet(env.Edge, tablet, info); + }); + env.WaitForWakeUp(); + + // We shouldn't see uncommitted row after a reboot as well + { + Cerr << "... checking rows" << Endl; + TString data; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) }, /* retry */ true); + UNIT_ASSERT_VALUES_EQUAL(data, + "Key 1 = Upsert value = Set foo value2 = Empty NULL\n"); + } + } + } Y_UNIT_TEST_SUITE(TFlatTableExecutor_LongTxAndBlobs) { diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h index 0abcf84acd..b12efed491 100644 --- a/ydb/core/tablet_flat/flat_mem_warm.h +++ b/ydb/core/tablet_flat/flat_mem_warm.h @@ -460,7 +460,8 @@ namespace NMem { return TxIdStats; } - void CommitTx(ui64 txId, TRowVersion rowVersion) { + bool CommitTx(ui64 txId, TRowVersion rowVersion) { + bool newRef = false; auto it = Committed.find(txId); bool toInsert = (it == Committed.end()); @@ -480,12 +481,16 @@ namespace NMem { UndoBuffer.push_back(TUndoOpInsertRemoved{ txId }); } Removed.erase(itRemoved); + } else { + newRef = true; } } } + return newRef; } - void RemoveTx(ui64 txId) { + bool RemoveTx(ui64 txId) { + bool newRef = false; auto it = Committed.find(txId); if (it == Committed.end()) { auto itRemoved = Removed.find(txId); @@ -494,8 +499,10 @@ namespace NMem { UndoBuffer.push_back(TUndoOpEraseRemoved{ txId }); } Removed.insert(txId); + newRef = true; } } + return newRef; } const absl::flat_hash_map<ui64, TRowVersion>& GetCommittedTransactions() const { diff --git a/ydb/core/tablet_flat/flat_ops_compact.h b/ydb/core/tablet_flat/flat_ops_compact.h index ce7be59a0c..a2adf6a025 100644 --- a/ydb/core/tablet_flat/flat_ops_compact.h +++ b/ydb/core/tablet_flat/flat_ops_compact.h @@ -255,40 +255,61 @@ namespace NTabletFlatExecutor { void WriteTxStatus() noexcept { - if (!Conf->CommittedTransactions && !Conf->RemovedTransactions) { - // Nothing to write - return; + if (!Conf->Frozen && !Conf->TxStatus) { + // Nothing to compact } - THashSet<ui64> txFilter; + absl::flat_hash_map<ui64, std::optional<TRowVersion>> status; + auto mergeStatus = [&](ui64 txId, const std::optional<TRowVersion>& version) { + if (Conf->GarbageTransactions.Contains(txId)) { + // We don't write garbage transactions + return; + } + auto it = status.find(txId); + if (it == status.end()) { + status[txId] = version; + } else if (version) { + if (!it->second) { + // commit wins over remove + it->second = version; + } else if (*version < *it->second) { + // lowest commit version wins + it->second = version; + } + } + }; + for (const auto& memTable : Conf->Frozen) { for (const auto& pr : memTable->GetCommittedTransactions()) { - txFilter.insert(pr.first); + mergeStatus(pr.first, pr.second); } for (const ui64 txId : memTable->GetRemovedTransactions()) { - txFilter.insert(txId); + mergeStatus(txId, std::nullopt); } } for (const auto& txStatus : Conf->TxStatus) { for (const auto& item : txStatus->TxStatusPage->GetCommittedItems()) { - txFilter.insert(item.GetTxId()); + mergeStatus(item.GetTxId(), item.GetRowVersion()); } for (const auto& item : txStatus->TxStatusPage->GetRemovedItems()) { - txFilter.insert(item.GetTxId()); + mergeStatus(item.GetTxId(), std::nullopt); } } - NTable::NPage::TTxStatusBuilder builder; - for (const auto& pr : Conf->CommittedTransactions) { - if (txFilter.contains(pr.first)) { - builder.AddCommitted(pr.first, pr.second); - } + if (status.empty()) { + // Nothing to write + return; } - for (const ui64 txId : Conf->RemovedTransactions) { - if (txFilter.contains(txId)) { - builder.AddRemoved(txId); + + NTable::NPage::TTxStatusBuilder builder; + for (const auto& pr : status) { + if (pr.second) { + builder.AddCommitted(pr.first, *pr.second); + } else { + builder.AddRemoved(pr.first); } } + auto data = builder.Finish(); if (!data) { // Don't write an empty page diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index d0acb0baf1..18519041ec 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -49,14 +49,18 @@ void TTable::RollbackChanges() struct TApplyRollbackOp { TTable* Self; - void operator()(const TRollbackRemoveTxRef& op) const { - auto it = Self->TxRefs.find(op.TxId); - Y_ABORT_UNLESS(it != Self->TxRefs.end()); + void operator()(const TRollbackRemoveTxDataRef& op) const { + auto it = Self->TxDataRefs.find(op.TxId); + Y_ABORT_UNLESS(it != Self->TxDataRefs.end()); if (0 == --it->second) { - Self->TxRefs.erase(it); + Self->TxDataRefs.erase(it); } } + void operator()(const TRollbackRemoveTxStatusRef& op) const { + Self->RemoveTxStatusRef(op.TxId); + } + void operator()(const TRollbackAddCommittedTx& op) const { Self->CommittedTransactions.Add(op.TxId, op.RowVersion); } @@ -210,7 +214,7 @@ TIntrusiveConstPtr<TRowScheme> TTable::GetScheme() const noexcept return Scheme; } -TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch head) +TAutoPtr<TSubset> TTable::CompactionSubset(TEpoch head, TArrayRef<const TLogoBlobID> bundle) { head = Min(head, Epoch); @@ -245,6 +249,49 @@ TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch hea subset->CommittedTransactions = CommittedTransactions; subset->RemovedTransactions = RemovedTransactions; + subset->GarbageTransactions = GarbageTransactions; + + return subset; +} + +TAutoPtr<TSubset> TTable::PartSwitchSubset(TEpoch head, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus) +{ + head = Min(head, Epoch); + + TAutoPtr<TSubset> subset = new TSubset(head, Scheme); + + if (head > TEpoch::Zero()) { + for (auto &x : Frozen) { + if (x->Epoch < head) { + subset->Frozen.emplace_back(x, x->Immediate()); + } + } + if (MutableBackup && MutableBackup->Epoch < head) { + subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate()); + } + } + + subset->Flatten.reserve(bundle.size()); + for (const TLogoBlobID &token : bundle) { + if (auto* c = ColdParts.FindPtr(token)) { + subset->ColdParts.push_back(*c); + continue; + } + auto* p = Flatten.FindPtr(token); + Y_VERIFY_S(p, "Cannot find part " << token); + subset->Flatten.push_back(*p); + } + + subset->TxStatus.reserve(txStatus.size()); + for (const TLogoBlobID &token : txStatus) { + auto* p = TxStatus.FindPtr(token); + Y_VERIFY_S(p, "Cannot find tx status " << token); + subset->TxStatus.push_back(*p); + } + + subset->CommittedTransactions = CommittedTransactions; + subset->RemovedTransactions = RemovedTransactions; + subset->GarbageTransactions = GarbageTransactions; return subset; } @@ -281,6 +328,7 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept // However it can still theoretically be used for iteration or compaction subset->CommittedTransactions = CommittedTransactions; subset->RemovedTransactions = RemovedTransactions; + subset->GarbageTransactions = GarbageTransactions; return subset; } @@ -346,8 +394,7 @@ TAutoPtr<TSubset> TTable::Unwrap() noexcept auto subset = Subset(TEpoch::Max()); - Replace({ }, *subset); - ReplaceTxStatus({ }, *subset); + Replace(*subset, { }, { }); Y_ABORT_UNLESS(!(Flatten || Frozen || Mutable || TxStatus)); @@ -384,11 +431,14 @@ void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept } } -void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept +void TTable::Replace( + const TSubset& subset, + TArrayRef<const TPartView> newParts, + TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus) noexcept { Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction"); - for (const auto &partView : partViews) { + for (const auto& partView : newParts) { Y_ABORT_UNLESS(partView, "Replace(...) shouldn't get empty parts"); Y_ABORT_UNLESS(!partView.Screen, "Replace(...) shouldn't get screened parts"); Y_ABORT_UNLESS(partView.Slices && *partView.Slices, "Got parts without slices"); @@ -403,9 +453,13 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset bool removingOld = false; bool addingNew = false; - THashSet<ui64> checkNewTransactions; - for (auto &memTable : subset.Frozen) { + // Note: we remove old parts first and add new ones next + // Refcount cannot become zero more than once so vectors are unique + std::vector<ui64> checkTxDataRefs; + std::vector<ui64> checkTxStatusRefs; + + for (auto& memTable : subset.Frozen) { removingOld = true; const auto found = Frozen.erase(memTable.MemTable); @@ -418,10 +472,27 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset for (const auto &pr : memTable.MemTable->GetTxIdStats()) { const ui64 txId = pr.first; - auto& count = TxRefs.at(txId); + auto& count = TxDataRefs.at(txId); Y_ABORT_UNLESS(count > 0); if (0 == --count) { - checkNewTransactions.insert(txId); + checkTxDataRefs.push_back(txId); + } + } + + for (const auto &pr : memTable.MemTable->GetCommittedTransactions()) { + const ui64 txId = pr.first; + auto& count = TxStatusRefs.at(txId); + Y_ABORT_UNLESS(count > 0); + if (0 == --count) { + checkTxStatusRefs.push_back(txId); + } + } + + for (ui64 txId : memTable.MemTable->GetRemovedTransactions()) { + auto& count = TxStatusRefs.at(txId); + Y_ABORT_UNLESS(count > 0); + if (0 == --count) { + checkTxStatusRefs.push_back(txId); } } } @@ -458,10 +529,10 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset if (existing->TxIdStats) { for (const auto& item : existing->TxIdStats->GetItems()) { const ui64 txId = item.GetTxId(); - auto& count = TxRefs.at(txId); + auto& count = TxDataRefs.at(txId); Y_ABORT_UNLESS(count > 0); if (0 == --count) { - checkNewTransactions.insert(txId); + checkTxDataRefs.push_back(txId); } } } @@ -482,7 +553,33 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset ColdParts.erase(it); } - for (const auto &partView : partViews) { + for (auto& part : subset.TxStatus) { + removingOld = true; + Y_ABORT_UNLESS(part, "Unexpected empty TTxStatusPart in TSubset"); + + auto it = TxStatus.find(part->Label); + Y_ABORT_UNLESS(it != TxStatus.end()); + TxStatus.erase(it); + + for (auto& item : part->TxStatusPage->GetCommittedItems()) { + const ui64 txId = item.GetTxId(); + auto& count = TxStatusRefs.at(txId); + Y_ABORT_UNLESS(count > 0); + if (0 == --count) { + checkTxStatusRefs.push_back(txId); + } + } + for (auto& item : part->TxStatusPage->GetRemovedItems()) { + const ui64 txId = item.GetTxId(); + auto& count = TxStatusRefs.at(txId); + Y_ABORT_UNLESS(count > 0); + if (0 == --count) { + checkTxStatusRefs.push_back(txId); + } + } + } + + for (const auto &partView : newParts) { addingNew = true; if (Mutable && partView->Epoch >= Mutable->Epoch) { Y_Fail("Replace with " << NFmt::Do(*partView) << " after mutable epoch " << Mutable->Epoch); @@ -497,20 +594,59 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset AddSafe(partView); } - for (ui64 txId : checkNewTransactions) { - auto it = TxRefs.find(txId); - Y_ABORT_UNLESS(it != TxRefs.end()); + for (const auto& txStatus : newTxStatus) { + if (Mutable && txStatus->Epoch >= Mutable->Epoch) { + Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after mutable epoch " << Mutable->Epoch); + } + + if (Frozen && txStatus->Epoch >= (*Frozen.begin())->Epoch) { + Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after frozen epoch " << (*Frozen.begin())->Epoch); + } + + Epoch = Max(Epoch, txStatus->Epoch + 1); + + auto res = TxStatus.emplace(txStatus->Label, txStatus); + Y_ABORT_UNLESS(res.second, "Unexpected failure to add a new TTxStatusPart"); + + for (auto& item : txStatus->TxStatusPage->GetCommittedItems()) { + const ui64 txId = item.GetTxId(); + ++TxStatusRefs[txId]; + } + for (auto& item : txStatus->TxStatusPage->GetRemovedItems()) { + const ui64 txId = item.GetTxId(); + ++TxStatusRefs[txId]; + } + } + + for (ui64 txId : checkTxDataRefs) { + auto it = TxDataRefs.find(txId); + Y_ABORT_UNLESS(it != TxDataRefs.end()); if (it->second == 0) { - // Transaction no longer needs to be tracked + // Transaction no longer has any known rows + TxDataRefs.erase(it); + OpenTxs.erase(txId); if (!ColdParts) { - CommittedTransactions.Remove(txId); - RemovedTransactions.Remove(txId); - DecidedTransactions.Remove(txId); + GarbageTransactions.Add(txId); } else { CheckTransactions.insert(txId); } - TxRefs.erase(it); - OpenTxs.erase(txId); + } + } + + for (ui64 txId : checkTxStatusRefs) { + auto it = TxStatusRefs.find(txId); + Y_ABORT_UNLESS(it != TxStatusRefs.end()); + if (it->second == 0) { + // This transaction no longer has any known status + TxStatusRefs.erase(it); + CommittedTransactions.Remove(txId); + RemovedTransactions.Remove(txId); + DecidedTransactions.Remove(txId); + GarbageTransactions.Remove(txId); + if (TxDataRefs.contains(txId)) { + // In the unlikely case it has some data it is now open + OpenTxs.insert(txId); + } } } @@ -526,34 +662,6 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset } } -void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus, const TSubset &subset) noexcept -{ - Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction"); - - for (auto &part : subset.TxStatus) { - Y_ABORT_UNLESS(part, "Unexpected empty TTxStatusPart in TSubset"); - - auto it = TxStatus.find(part->Label); - Y_ABORT_UNLESS(it != TxStatus.end()); - TxStatus.erase(it); - } - - for (const auto& txStatus : newTxStatus) { - if (Mutable && txStatus->Epoch >= Mutable->Epoch) { - Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after mutable epoch " << Mutable->Epoch); - } - - if (Frozen && txStatus->Epoch >= (*Frozen.begin())->Epoch) { - Y_Fail("Replace with " << NFmt::Do(*txStatus) << " after frozen epoch " << (*Frozen.begin())->Epoch); - } - - Epoch = Max(Epoch, txStatus->Epoch + 1); - - auto res = TxStatus.emplace(txStatus->Label, txStatus); - Y_ABORT_UNLESS(res.second, "Unexpected failure to add a new TTxStatusPart"); - } -} - void TTable::Merge(TPartView partView) noexcept { Y_ABORT_UNLESS(!RollbackState, "Cannot perform this in a transaction"); @@ -639,9 +747,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept } } } - if (!TxRefs.contains(txId)) { + if (!TxDataRefs.contains(txId)) { CheckTransactions.insert(txId); } + ++TxStatusRefs[txId]; DecidedTransactions.Add(txId); OpenTxs.erase(txId); } @@ -654,9 +763,10 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept // This is not an error in some cases, but may be suspicious RemovedCommittedTxs++; } - if (!TxRefs.contains(txId)) { + if (!TxDataRefs.contains(txId)) { CheckTransactions.insert(txId); } + ++TxStatusRefs[txId]; DecidedTransactions.Add(txId); OpenTxs.erase(txId); } @@ -679,15 +789,18 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept // eventuality, so doesn't need to be invalidated. } +void TTable::MergeDone() noexcept +{ + ProcessCheckTransactions(); +} + void TTable::ProcessCheckTransactions() noexcept { if (!ColdParts) { for (ui64 txId : CheckTransactions) { - auto it = TxRefs.find(txId); - if (it == TxRefs.end()) { - CommittedTransactions.Remove(txId); - RemovedTransactions.Remove(txId); - DecidedTransactions.Remove(txId); + auto it = TxDataRefs.find(txId); + if (it == TxDataRefs.end()) { + GarbageTransactions.Add(txId); } } CheckTransactions.clear(); @@ -784,8 +897,8 @@ void TTable::AddSafe(TPartView partView) if (partView->TxIdStats) { for (const auto& item : partView->TxIdStats->GetItems()) { const ui64 txId = item.GetTxId(); - const auto newCount = ++TxRefs[txId]; - if (newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) { + const auto newCount = ++TxDataRefs[txId]; + if (newCount == 1 && !TxStatusRefs.contains(txId)) { OpenTxs.insert(txId); } } @@ -885,10 +998,10 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG } } -void TTable::AddTxRef(ui64 txId) +void TTable::AddTxDataRef(ui64 txId) { - const auto newCount = ++TxRefs[txId]; - const bool addOpenTx = newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId); + const auto newCount = ++TxDataRefs[txId]; + const bool addOpenTx = newCount == 1 && !TxStatusRefs.contains(txId); if (addOpenTx) { auto res = OpenTxs.insert(txId); Y_ABORT_UNLESS(res.second); @@ -896,17 +1009,47 @@ void TTable::AddTxRef(ui64 txId) "Decided transaction %" PRIu64 " is both open and decided", txId); } if (RollbackState) { - RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId }); + RollbackOps.emplace_back(TRollbackRemoveTxDataRef{ txId }); if (addOpenTx) { RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId }); } } } +void TTable::AddTxStatusRef(ui64 txId) +{ + ++TxStatusRefs[txId]; + if (RollbackState) { + RollbackOps.emplace_back(TRollbackRemoveTxStatusRef{ txId }); + } +} + +void TTable::RemoveTxStatusRef(ui64 txId) +{ + auto it = TxStatusRefs.find(txId); + Y_ABORT_UNLESS(it != TxStatusRefs.end()); + Y_ABORT_UNLESS(it->second > 0); + if (0 == --it->second) { + // This was the last reference + TxStatusRefs.erase(it); + // Remove the corresponding committed/removed record + CommittedTransactions.Remove(txId); + RemovedTransactions.Remove(txId); + Y_DEBUG_ABORT_UNLESS(!GarbageTransactions.Contains(txId), + "Garbage transaction %" PRIu64 " has no status", txId); + Y_DEBUG_ABORT_UNLESS(!DecidedTransactions.Contains(txId), + "Decided transaction %" PRIu64 " has no status", txId); + // Transactions with data but without tx status are open + if (TxDataRefs.contains(txId)) { + OpenTxs.insert(txId); + } + } +} + void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId) { auto& memTable = MemTable(); - bool hadTxRef = memTable.GetTxIdStats().contains(txId); + bool hadTxDataRef = memTable.GetTxIdStats().contains(txId); if (ErasedKeysCache) { const TCelled cells(key, *Scheme->Keys, true); @@ -920,11 +1063,11 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe TRowVersion rowVersion(Max<ui64>(), txId); MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions); - if (!hadTxRef) { + if (!hadTxDataRef) { Y_DEBUG_ABORT_UNLESS(memTable.GetTxIdStats().contains(txId)); - AddTxRef(txId); + AddTxDataRef(txId); } else { - Y_DEBUG_ABORT_UNLESS(TxRefs[txId] > 0); + Y_DEBUG_ABORT_UNLESS(TxDataRefs[txId] > 0); } if (TableObserver) { @@ -935,7 +1078,9 @@ void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMe void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) { // TODO: track suspicious transactions (not open at commit time) - MemTable().CommitTx(txId, rowVersion); + if (MemTable().CommitTx(txId, rowVersion)) { + AddTxStatusRef(txId); + } // Note: it is possible to have multiple CommitTx for the same TxId but at // different row versions. The commit with the minimum row version wins. @@ -975,7 +1120,9 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) void TTable::RemoveTx(ui64 txId) { // TODO: track suspicious transactions (not open at remove time) - MemTable().RemoveTx(txId); + if (MemTable().RemoveTx(txId)) { + AddTxStatusRef(txId); + } // Note: it is possible to have both CommitTx and RemoveTx for the same TxId // due to complicated split/merge shard interactions. The commit actually @@ -1008,7 +1155,7 @@ bool TTable::HasOpenTx(ui64 txId) const bool TTable::HasTxData(ui64 txId) const { - return TxRefs.contains(txId); + return TxDataRefs.contains(txId) || TxStatusRefs.contains(txId); } bool TTable::HasCommittedTx(ui64 txId) const @@ -1033,7 +1180,7 @@ size_t TTable::GetOpenTxCount() const size_t TTable::GetTxsWithDataCount() const { - return TxRefs.size(); + return TxDataRefs.size(); } size_t TTable::GetCommittedTxCount() const @@ -1050,7 +1197,7 @@ TTableRuntimeStats TTable::RuntimeStats() const noexcept { return TTableRuntimeStats{ .OpenTxCount = OpenTxs.size(), - .TxsWithDataCount = TxRefs.size(), + .TxsWithDataCount = TxDataRefs.size() + GarbageTransactions.Size(), .CommittedTxCount = CommittedTransactions.Size(), .RemovedTxCount = RemovedTransactions.Size(), .RemovedCommittedTxs = RemovedCommittedTxs, diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 493a5ed40d..405ab7a109 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -84,7 +84,8 @@ public: return Epoch; } - TAutoPtr<TSubset> Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch edge); + TAutoPtr<TSubset> CompactionSubset(TEpoch edge, TArrayRef<const TLogoBlobID> bundle); + TAutoPtr<TSubset> PartSwitchSubset(TEpoch edge, TArrayRef<const TLogoBlobID> bundle, TArrayRef<const TLogoBlobID> txStatus); TAutoPtr<TSubset> Subset(TEpoch edge) const noexcept; TAutoPtr<TSubset> ScanSnapshot(TRowVersion snapshot = TRowVersion::Max()) noexcept; TAutoPtr<TSubset> Unwrap() noexcept; /* full Subset(..) + final Replace(..) */ @@ -110,8 +111,7 @@ public: be displaced from table with Clean() method eventually. */ - void Replace(TArrayRef<const TPartView>, const TSubset&) noexcept; - void ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&) noexcept; + void Replace(const TSubset&, TArrayRef<const TPartView>, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>) noexcept; /*_ Special interface for clonig flatten part of table for outer usage. Cook some TPartView with Subset(...) method and/or TShrink tool first and @@ -121,6 +121,7 @@ public: void Merge(TPartView partView) noexcept; void Merge(TIntrusiveConstPtr<TColdPart> part) noexcept; void Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept; + void MergeDone() noexcept; void ProcessCheckTransactions() noexcept; /** @@ -339,7 +340,9 @@ private: void RemoveStat(const TPartView& partView); private: - void AddTxRef(ui64 txId); + void AddTxDataRef(ui64 txId); + void AddTxStatusRef(ui64 txId); + void RemoveTxStatusRef(ui64 txId); private: TEpoch Epoch; /* Monotonic table change number, with holes */ @@ -361,18 +364,38 @@ private: TRowVersionRanges RemovedRowVersions; - absl::flat_hash_map<ui64, size_t> TxRefs; + // The number of entities (memtable/sst) that have rows with a TxId. As + // long as there is at least one row with a TxId its commit/remove status + // must be preserved. + absl::flat_hash_map<ui64, size_t> TxDataRefs; + + // The number of entities (memtable/txstatus) that have a commit/remove + // status for a TxId. As long as there is at least one such entity the + // transaction cannot be used again without artifacts, and must stay + // in committed/removed set. + absl::flat_hash_map<ui64, size_t> TxStatusRefs; + + // A set of open transactions, i.e. transactions that have rows with the + // specified TxId and that have not been committed or removed yet. absl::flat_hash_set<ui64> OpenTxs; + + // A set of transactions that need to be re-checked after a merge. absl::flat_hash_set<ui64> CheckTransactions; + TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; TTransactionSet DecidedTransactions; + TTransactionSet GarbageTransactions; TIntrusivePtr<ITableObserver> TableObserver; ui64 RemovedCommittedTxs = 0; private: - struct TRollbackRemoveTxRef { + struct TRollbackRemoveTxDataRef { + ui64 TxId; + }; + + struct TRollbackRemoveTxStatusRef { ui64 TxId; }; @@ -402,7 +425,8 @@ private: }; using TRollbackOp = std::variant< - TRollbackRemoveTxRef, + TRollbackRemoveTxDataRef, + TRollbackRemoveTxStatusRef, TRollbackAddCommittedTx, TRollbackRemoveCommittedTx, TRollbackAddRemovedTx, diff --git a/ydb/core/tablet_flat/flat_table_subset.h b/ydb/core/tablet_flat/flat_table_subset.h index 0c7355abfa..fca40ac93b 100644 --- a/ydb/core/tablet_flat/flat_table_subset.h +++ b/ydb/core/tablet_flat/flat_table_subset.h @@ -118,6 +118,7 @@ namespace NTable { TVector<TIntrusiveConstPtr<TColdPart>> ColdParts; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; + TTransactionSet GarbageTransactions; TVector<TIntrusiveConstPtr<TTxStatusPart>> TxStatus; }; diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h index 5cfe807b00..47d3526b4d 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h +++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h @@ -246,9 +246,12 @@ namespace NTest { if (last /* make full subset */) { subset = Base->Subset(table, TEpoch::Max(), { }, { }); } else /* only flush memtables */ { - subset = Base->Subset(table, { }, TEpoch::Max()); + subset = Base->CompactionSubset(table, TEpoch::Max(), { }); } + // Note: we don't compact TxStatus in these tests + Y_ABORT_UNLESS(subset->TxStatus.empty()); + TLogoBlobID logo(1, Gen, ++Step, 1, 0, 0); auto *family = Base->GetScheme().DefaultFamilyFor(table); @@ -279,7 +282,7 @@ namespace NTest { for (auto &part : eggs.Parts) partViews.push_back({ part, nullptr, part->Slices }); - Base->Replace(table, std::move(partViews), *subset); + Base->Replace(table, *subset, std::move(partViews), { }); return *this; } diff --git a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h index 96b97e01f6..be28e08680 100644 --- a/ydb/core/tablet_flat/ut/flat_comp_ut_common.h +++ b/ydb/core/tablet_flat/ut/flat_comp_ut_common.h @@ -125,11 +125,14 @@ public: SnapshotTable(params->Table); } - auto subset = DB.Subset(params->Table, { }, params->Edge.Head); + auto subset = DB.CompactionSubset(params->Table, params->Edge.Head, { }); if (params->Parts) { subset->Flatten.insert(subset->Flatten.end(), params->Parts.begin(), params->Parts.end()); } + // Note: we don't compact TxStatus in these tests + Y_ABORT_UNLESS(subset->TxStatus.empty()); + Y_ABORT_UNLESS(!*subset || subset->IsStickedToHead()); const auto& scheme = DB.GetScheme(); @@ -163,8 +166,7 @@ public: Y_ABORT_UNLESS(parts.back()); } - auto partsCopy = parts; - DB.Replace(params->Table, partsCopy, *subset); + DB.Replace(params->Table, *subset, parts, { }); return MakeHolder<TCompactionResult>(subset->Epoch(), std::move(parts)); } |