diff options
author | snaury <[email protected]> | 2022-08-16 10:57:09 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2022-08-16 10:57:09 +0300 |
commit | c5d032b79b690a0f45daa421e1c2cd239206b968 (patch) | |
tree | a2ecd85f5516fe903d5dfd9ee3c9bc4ebd2cb6b5 | |
parent | d7efecffef1a3d70647a2bb84dd1105cdc3d396a (diff) |
Reduce overhead per uncommitted transaction,
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 1 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 85 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 22 |
4 files changed, 50 insertions, 63 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index bc106e823e6..29761dfc8d2 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -307,6 +307,11 @@ bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const return Require(table)->HasOpenTx(txId); } +bool TDatabase::HasTxData(ui32 table, ui64 txId) const +{ + return Require(table)->HasTxData(txId); +} + bool TDatabase::HasCommittedTx(ui32 table, ui64 txId) const { return Require(table)->HasCommittedTx(txId); diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 06740064e3a..496a6aed34f 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -118,6 +118,7 @@ public: * Returns true when table has an open transaction that is not committed or removed yet */ bool HasOpenTx(ui32 table, ui64 txId) const; + bool HasTxData(ui32 table, ui64 txId) const; bool HasCommittedTx(ui32 table, ui64 txId) const; bool HasRemovedTx(ui32 table, ui64 txId) const; diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index ca3f4de6322..7d8fef3da4c 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -41,12 +41,12 @@ void TTable::RollbackChanges() struct TApplyRollbackOp { TTable* Self; - void operator()(const TRollbackRemoveOpenTx& op) const { - Self->OpenTransactions.erase(op.TxId); - } - - void operator()(const TRollbackRemoveOpenTxMem& op) const { - Self->OpenTransactions[op.TxId].Mem.erase(op.Mem); + void operator()(const TRollbackRemoveTxRef& op) const { + auto it = Self->TxRefs.find(op.TxId); + Y_VERIFY(it != Self->TxRefs.end()); + if (0 == --it->second) { + Self->TxRefs.erase(it); + } } void operator()(const TRollbackAddCommittedTx& op) const { @@ -389,10 +389,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset for (const auto &pr : memTable.MemTable->GetTxIdStats()) { const ui64 txId = pr.first; - auto& tx = OpenTransactions[txId]; - bool removed = tx.Mem.erase(memTable.MemTable); - Y_VERIFY(removed); - if (tx.Mem.empty() && tx.Parts.empty()) { + auto& count = TxRefs.at(txId); + Y_VERIFY(count > 0); + if (0 == --count) { checkNewTransactions.insert(txId); } } @@ -429,10 +428,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset if (existing->TxIdStats) { for (const auto& item : existing->TxIdStats->GetItems()) { const ui64 txId = item.GetTxId(); - auto& tx = OpenTransactions[txId]; - bool removed = tx.Parts.erase(existing.Part); - Y_VERIFY(removed); - if (tx.Mem.empty() && tx.Parts.empty()) { + auto& count = TxRefs.at(txId); + Y_VERIFY(count > 0); + if (0 == --count) { checkNewTransactions.insert(txId); } } @@ -468,10 +466,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset } for (ui64 txId : checkNewTransactions) { - auto it = OpenTransactions.find(txId); - Y_VERIFY(it != OpenTransactions.end()); - auto& tx = it->second; - if (tx.Mem.empty() && tx.Parts.empty()) { + auto it = TxRefs.find(txId); + Y_VERIFY(it != TxRefs.end()); + if (it->second == 0) { // Transaction no longer needs to be tracked if (!ColdParts) { CommittedTransactions.Remove(txId); @@ -479,7 +476,7 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset } else { CheckTransactions.insert(txId); } - OpenTransactions.erase(it); + TxRefs.erase(it); } } @@ -592,7 +589,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept RemovedTransactions.Remove(txId); } } - if (!OpenTransactions.contains(txId)) { + if (!TxRefs.contains(txId)) { CheckTransactions.insert(txId); } } @@ -601,7 +598,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) { RemovedTransactions.Add(txId); } - if (!OpenTransactions.contains(txId)) { + if (!TxRefs.contains(txId)) { CheckTransactions.insert(txId); } } @@ -626,8 +623,8 @@ void TTable::ProcessCheckTransactions() noexcept { if (!ColdParts) { for (ui64 txId : CheckTransactions) { - auto it = OpenTransactions.find(txId); - if (it == OpenTransactions.end()) { + auto it = TxRefs.find(txId); + if (it == TxRefs.end()) { CommittedTransactions.Remove(txId); RemovedTransactions.Remove(txId); } @@ -726,7 +723,7 @@ void TTable::AddSafe(TPartView partView) if (partView->TxIdStats) { for (const auto& item : partView->TxIdStats->GetItems()) { const ui64 txId = item.GetTxId(); - OpenTransactions[txId].Parts.insert(partView.Part); + ++TxRefs[txId]; } } @@ -821,37 +818,28 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions); } -TTable::TOpenTransaction& TTable::AddOpenTransaction(ui64 txId) +void TTable::AddTxRef(ui64 txId) { - TOpenTransactions::insert_ctx ctx = nullptr; - TOpenTransactions::iterator it = OpenTransactions.find(txId, ctx); - - if (it == OpenTransactions.end()) { - if (RollbackState) { - RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId }); - } - - it = OpenTransactions.emplace_direct( - ctx, - std::piecewise_construct, - std::forward_as_tuple(txId), - std::forward_as_tuple()); + ++TxRefs[txId]; + if (RollbackState) { + RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId }); } - - return it->second; } void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId) { + auto& memTable = MemTable(); + bool hadTxRef = memTable.GetTxIdStats().contains(txId); + // Use a special row version that marks this update as uncommitted TRowVersion rowVersion(Max<ui64>(), txId); MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions); - Y_VERIFY_DEBUG(Mutable->GetTxIdStats().contains(txId)); - - auto& openTx = AddOpenTransaction(txId); - if (openTx.Mem.insert(Mutable).second && RollbackState) { - RollbackOps.emplace_back(TRollbackRemoveOpenTxMem{ txId, Mutable }); + if (!hadTxRef) { + Y_VERIFY_DEBUG(memTable.GetTxIdStats().contains(txId)); + AddTxRef(txId); + } else { + Y_VERIFY_DEBUG(TxRefs[txId] > 0); } } @@ -901,13 +889,18 @@ void TTable::RemoveTx(ui64 txId) bool TTable::HasOpenTx(ui64 txId) const { - if (OpenTransactions.contains(txId)) { + if (TxRefs.contains(txId)) { return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId); } return false; } +bool TTable::HasTxData(ui64 txId) const +{ + return TxRefs.contains(txId); +} + bool TTable::HasCommittedTx(ui64 txId) const { return CommittedTransactions.Find(txId); diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 14f0ebf2622..1cc1e3cfdba 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -163,6 +163,7 @@ public: * Returns true when table has an open transaction that is not committed or removed yet */ bool HasOpenTx(ui64 txId) const; + bool HasTxData(ui64 txId) const; bool HasCommittedTx(ui64 txId) const; bool HasRemovedTx(ui64 txId) const; @@ -315,14 +316,7 @@ private: void RemoveStat(const TPartView& partView); private: - struct TOpenTransaction { - THashSet<TIntrusiveConstPtr<TMemTable>> Mem; - THashSet<TIntrusiveConstPtr<TPart>> Parts; - }; - - using TOpenTransactions = THashMap<ui64, TOpenTransaction>; - - TOpenTransaction& AddOpenTransaction(ui64 txId); + void AddTxRef(ui64 txId); private: TEpoch Epoch; /* Monotonic table change number, with holes */ @@ -343,19 +337,14 @@ private: TRowVersionRanges RemovedRowVersions; + THashMap<ui64, size_t> TxRefs; THashSet<ui64> CheckTransactions; - TOpenTransactions OpenTransactions; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; private: - struct TRollbackRemoveOpenTx { - ui64 TxId; - }; - - struct TRollbackRemoveOpenTxMem { + struct TRollbackRemoveTxRef { ui64 TxId; - TIntrusiveConstPtr<TMemTable> Mem; }; struct TRollbackAddCommittedTx { @@ -376,8 +365,7 @@ private: }; using TRollbackOp = std::variant< - TRollbackRemoveOpenTx, - TRollbackRemoveOpenTxMem, + TRollbackRemoveTxRef, TRollbackAddCommittedTx, TRollbackRemoveCommittedTx, TRollbackAddRemovedTx, |