summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2022-08-16 10:57:09 +0300
committersnaury <[email protected]>2022-08-16 10:57:09 +0300
commitc5d032b79b690a0f45daa421e1c2cd239206b968 (patch)
treea2ecd85f5516fe903d5dfd9ee3c9bc4ebd2cb6b5
parentd7efecffef1a3d70647a2bb84dd1105cdc3d396a (diff)
Reduce overhead per uncommitted transaction,
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp5
-rw-r--r--ydb/core/tablet_flat/flat_database.h1
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp85
-rw-r--r--ydb/core/tablet_flat/flat_table.h22
4 files changed, 50 insertions, 63 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index bc106e823e6..29761dfc8d2 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -307,6 +307,11 @@ bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const
return Require(table)->HasOpenTx(txId);
}
+bool TDatabase::HasTxData(ui32 table, ui64 txId) const
+{
+ return Require(table)->HasTxData(txId);
+}
+
bool TDatabase::HasCommittedTx(ui32 table, ui64 txId) const
{
return Require(table)->HasCommittedTx(txId);
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 06740064e3a..496a6aed34f 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -118,6 +118,7 @@ public:
* Returns true when table has an open transaction that is not committed or removed yet
*/
bool HasOpenTx(ui32 table, ui64 txId) const;
+ bool HasTxData(ui32 table, ui64 txId) const;
bool HasCommittedTx(ui32 table, ui64 txId) const;
bool HasRemovedTx(ui32 table, ui64 txId) const;
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index ca3f4de6322..7d8fef3da4c 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -41,12 +41,12 @@ void TTable::RollbackChanges()
struct TApplyRollbackOp {
TTable* Self;
- void operator()(const TRollbackRemoveOpenTx& op) const {
- Self->OpenTransactions.erase(op.TxId);
- }
-
- void operator()(const TRollbackRemoveOpenTxMem& op) const {
- Self->OpenTransactions[op.TxId].Mem.erase(op.Mem);
+ void operator()(const TRollbackRemoveTxRef& op) const {
+ auto it = Self->TxRefs.find(op.TxId);
+ Y_VERIFY(it != Self->TxRefs.end());
+ if (0 == --it->second) {
+ Self->TxRefs.erase(it);
+ }
}
void operator()(const TRollbackAddCommittedTx& op) const {
@@ -389,10 +389,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
for (const auto &pr : memTable.MemTable->GetTxIdStats()) {
const ui64 txId = pr.first;
- auto& tx = OpenTransactions[txId];
- bool removed = tx.Mem.erase(memTable.MemTable);
- Y_VERIFY(removed);
- if (tx.Mem.empty() && tx.Parts.empty()) {
+ auto& count = TxRefs.at(txId);
+ Y_VERIFY(count > 0);
+ if (0 == --count) {
checkNewTransactions.insert(txId);
}
}
@@ -429,10 +428,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
if (existing->TxIdStats) {
for (const auto& item : existing->TxIdStats->GetItems()) {
const ui64 txId = item.GetTxId();
- auto& tx = OpenTransactions[txId];
- bool removed = tx.Parts.erase(existing.Part);
- Y_VERIFY(removed);
- if (tx.Mem.empty() && tx.Parts.empty()) {
+ auto& count = TxRefs.at(txId);
+ Y_VERIFY(count > 0);
+ if (0 == --count) {
checkNewTransactions.insert(txId);
}
}
@@ -468,10 +466,9 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
}
for (ui64 txId : checkNewTransactions) {
- auto it = OpenTransactions.find(txId);
- Y_VERIFY(it != OpenTransactions.end());
- auto& tx = it->second;
- if (tx.Mem.empty() && tx.Parts.empty()) {
+ auto it = TxRefs.find(txId);
+ Y_VERIFY(it != TxRefs.end());
+ if (it->second == 0) {
// Transaction no longer needs to be tracked
if (!ColdParts) {
CommittedTransactions.Remove(txId);
@@ -479,7 +476,7 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
} else {
CheckTransactions.insert(txId);
}
- OpenTransactions.erase(it);
+ TxRefs.erase(it);
}
}
@@ -592,7 +589,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
RemovedTransactions.Remove(txId);
}
}
- if (!OpenTransactions.contains(txId)) {
+ if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
}
@@ -601,7 +598,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) {
RemovedTransactions.Add(txId);
}
- if (!OpenTransactions.contains(txId)) {
+ if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
}
@@ -626,8 +623,8 @@ void TTable::ProcessCheckTransactions() noexcept
{
if (!ColdParts) {
for (ui64 txId : CheckTransactions) {
- auto it = OpenTransactions.find(txId);
- if (it == OpenTransactions.end()) {
+ auto it = TxRefs.find(txId);
+ if (it == TxRefs.end()) {
CommittedTransactions.Remove(txId);
RemovedTransactions.Remove(txId);
}
@@ -726,7 +723,7 @@ void TTable::AddSafe(TPartView partView)
if (partView->TxIdStats) {
for (const auto& item : partView->TxIdStats->GetItems()) {
const ui64 txId = item.GetTxId();
- OpenTransactions[txId].Parts.insert(partView.Part);
+ ++TxRefs[txId];
}
}
@@ -821,37 +818,28 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG
MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
}
-TTable::TOpenTransaction& TTable::AddOpenTransaction(ui64 txId)
+void TTable::AddTxRef(ui64 txId)
{
- TOpenTransactions::insert_ctx ctx = nullptr;
- TOpenTransactions::iterator it = OpenTransactions.find(txId, ctx);
-
- if (it == OpenTransactions.end()) {
- if (RollbackState) {
- RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId });
- }
-
- it = OpenTransactions.emplace_direct(
- ctx,
- std::piecewise_construct,
- std::forward_as_tuple(txId),
- std::forward_as_tuple());
+ ++TxRefs[txId];
+ if (RollbackState) {
+ RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId });
}
-
- return it->second;
}
void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId)
{
+ auto& memTable = MemTable();
+ bool hadTxRef = memTable.GetTxIdStats().contains(txId);
+
// Use a special row version that marks this update as uncommitted
TRowVersion rowVersion(Max<ui64>(), txId);
MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
- Y_VERIFY_DEBUG(Mutable->GetTxIdStats().contains(txId));
-
- auto& openTx = AddOpenTransaction(txId);
- if (openTx.Mem.insert(Mutable).second && RollbackState) {
- RollbackOps.emplace_back(TRollbackRemoveOpenTxMem{ txId, Mutable });
+ if (!hadTxRef) {
+ Y_VERIFY_DEBUG(memTable.GetTxIdStats().contains(txId));
+ AddTxRef(txId);
+ } else {
+ Y_VERIFY_DEBUG(TxRefs[txId] > 0);
}
}
@@ -901,13 +889,18 @@ void TTable::RemoveTx(ui64 txId)
bool TTable::HasOpenTx(ui64 txId) const
{
- if (OpenTransactions.contains(txId)) {
+ if (TxRefs.contains(txId)) {
return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId);
}
return false;
}
+bool TTable::HasTxData(ui64 txId) const
+{
+ return TxRefs.contains(txId);
+}
+
bool TTable::HasCommittedTx(ui64 txId) const
{
return CommittedTransactions.Find(txId);
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 14f0ebf2622..1cc1e3cfdba 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -163,6 +163,7 @@ public:
* Returns true when table has an open transaction that is not committed or removed yet
*/
bool HasOpenTx(ui64 txId) const;
+ bool HasTxData(ui64 txId) const;
bool HasCommittedTx(ui64 txId) const;
bool HasRemovedTx(ui64 txId) const;
@@ -315,14 +316,7 @@ private:
void RemoveStat(const TPartView& partView);
private:
- struct TOpenTransaction {
- THashSet<TIntrusiveConstPtr<TMemTable>> Mem;
- THashSet<TIntrusiveConstPtr<TPart>> Parts;
- };
-
- using TOpenTransactions = THashMap<ui64, TOpenTransaction>;
-
- TOpenTransaction& AddOpenTransaction(ui64 txId);
+ void AddTxRef(ui64 txId);
private:
TEpoch Epoch; /* Monotonic table change number, with holes */
@@ -343,19 +337,14 @@ private:
TRowVersionRanges RemovedRowVersions;
+ THashMap<ui64, size_t> TxRefs;
THashSet<ui64> CheckTransactions;
- TOpenTransactions OpenTransactions;
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
private:
- struct TRollbackRemoveOpenTx {
- ui64 TxId;
- };
-
- struct TRollbackRemoveOpenTxMem {
+ struct TRollbackRemoveTxRef {
ui64 TxId;
- TIntrusiveConstPtr<TMemTable> Mem;
};
struct TRollbackAddCommittedTx {
@@ -376,8 +365,7 @@ private:
};
using TRollbackOp = std::variant<
- TRollbackRemoveOpenTx,
- TRollbackRemoveOpenTxMem,
+ TRollbackRemoveTxRef,
TRollbackAddCommittedTx,
TRollbackRemoveCommittedTx,
TRollbackAddRemovedTx,