aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-05-22 16:09:47 +0300
committersnaury <snaury@ydb.tech>2023-05-22 16:09:47 +0300
commitc73082e61b645e3117d59a1e14e5d3e5159012bd (patch)
tree4fcb584062584061dbc4fcf56f9b0c9559cce41b
parent1366a3159c4a93d17b9cb243145613270891df00 (diff)
downloadydb-c73082e61b645e3117d59a1e14e5d3e5159012bd.tar.gz
Track open transactions in a table for a fast open tx count
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp7
-rw-r--r--ydb/core/tablet_flat/flat_database.h11
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp60
-rw-r--r--ydb/core/tablet_flat/flat_table.h16
-rw-r--r--ydb/core/tx/datashard/datashard.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard.h5
-rw-r--r--ydb/core/tx/datashard/make_snapshot_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp3
8 files changed, 80 insertions, 28 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 93c3411da37..7150ae5be3f 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -374,11 +374,16 @@ bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const
return Require(table)->HasRemovedTx(txId);
}
-TVector<ui64> TDatabase::GetOpenTxs(ui32 table) const
+const absl::flat_hash_set<ui64>& TDatabase::GetOpenTxs(ui32 table) const
{
return Require(table)->GetOpenTxs();
}
+size_t TDatabase::GetOpenTxCount(ui32 table) const
+{
+ return Require(table)->GetOpenTxCount();
+}
+
void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper)
{
if (Y_LIKELY(lower < upper)) {
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 9410e4906c0..c2eaa3fb854 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -129,11 +129,18 @@ public:
bool HasRemovedTx(ui32 table, ui64 txId) const;
/**
- * Returns a list of open transactions in the provided table. This only
+ * Returns a set of open transactions in the provided table. This only
* includes transactions with changes that are neither committed nor
* removed.
*/
- TVector<ui64> GetOpenTxs(ui32 table) const;
+ const absl::flat_hash_set<ui64>& GetOpenTxs(ui32 table) const;
+
+ /**
+ * Returns a number of open transactions in the provided table. This only
+ * includes transactions with changes that are neither committed nor
+ * removed.
+ */
+ size_t GetOpenTxCount(ui32 table) const;
/**
* Remove row versions [lower, upper) from the given table
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index 5e976d64c2e..80d5bbe8ecd 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -64,6 +64,14 @@ void TTable::RollbackChanges()
void operator()(const TRollbackRemoveRemovedTx& op) const {
Self->RemovedTransactions.Remove(op.TxId);
}
+
+ void operator()(const TRollbackAddOpenTx& op) const {
+ Self->OpenTxs.insert(op.TxId);
+ }
+
+ void operator()(const TRollbackRemoveOpenTx& op) const {
+ Self->OpenTxs.erase(op.TxId);
+ }
};
std::visit(TApplyRollbackOp{ this }, RollbackOps.back());
@@ -477,6 +485,7 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
CheckTransactions.insert(txId);
}
TxRefs.erase(it);
+ OpenTxs.erase(txId);
}
}
@@ -592,6 +601,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
+ OpenTxs.erase(txId);
}
for (auto& item : txStatus->TxStatusPage->GetRemovedItems()) {
const ui64 txId = item.GetTxId();
@@ -601,6 +611,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
if (!TxRefs.contains(txId)) {
CheckTransactions.insert(txId);
}
+ OpenTxs.erase(txId);
}
if (Mutable && txStatus->Epoch >= Mutable->Epoch) {
@@ -723,7 +734,10 @@ void TTable::AddSafe(TPartView partView)
if (partView->TxIdStats) {
for (const auto& item : partView->TxIdStats->GetItems()) {
const ui64 txId = item.GetTxId();
- ++TxRefs[txId];
+ const auto newCount = ++TxRefs[txId];
+ if (newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) {
+ OpenTxs.insert(txId);
+ }
}
}
@@ -820,9 +834,17 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG
void TTable::AddTxRef(ui64 txId)
{
- ++TxRefs[txId];
+ const auto newCount = ++TxRefs[txId];
+ const bool addOpenTx = newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId);
+ if (addOpenTx) {
+ auto res = OpenTxs.insert(txId);
+ Y_VERIFY(res.second);
+ }
if (RollbackState) {
RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId });
+ if (addOpenTx) {
+ RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId });
+ }
}
}
@@ -865,6 +887,12 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
}
RemovedTransactions.Remove(txId);
}
+ if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) {
+ if (RollbackState) {
+ RollbackOps.emplace_back(TRollbackAddOpenTx{ txId });
+ }
+ OpenTxs.erase(it);
+ }
}
// We don't know which keys have been commited, invalidate everything
@@ -884,16 +912,18 @@ void TTable::RemoveTx(ui64 txId)
RollbackOps.emplace_back(TRollbackRemoveRemovedTx{ txId });
}
RemovedTransactions.Add(txId);
+ if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) {
+ if (RollbackState) {
+ RollbackOps.emplace_back(TRollbackAddOpenTx{ txId });
+ }
+ OpenTxs.erase(it);
+ }
}
}
bool TTable::HasOpenTx(ui64 txId) const
{
- if (TxRefs.contains(txId)) {
- return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId);
- }
-
- return false;
+ return OpenTxs.contains(txId);
}
bool TTable::HasTxData(ui64 txId) const
@@ -911,18 +941,14 @@ bool TTable::HasRemovedTx(ui64 txId) const
return RemovedTransactions.Contains(txId);
}
-TVector<ui64> TTable::GetOpenTxs() const
+const absl::flat_hash_set<ui64>& TTable::GetOpenTxs() const
{
- TVector<ui64> txs;
-
- for (auto& pr : TxRefs) {
- ui64 txId = pr.first;
- if (!CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) {
- txs.push_back(txId);
- }
- }
+ return OpenTxs;
+}
- return txs;
+size_t TTable::GetOpenTxCount() const
+{
+ return OpenTxs.size();
}
TMemTable& TTable::MemTable()
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 54430fcfbc9..6200fa9087a 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -172,7 +172,8 @@ public:
bool HasCommittedTx(ui64 txId) const;
bool HasRemovedTx(ui64 txId) const;
- TVector<ui64> GetOpenTxs() const;
+ const absl::flat_hash_set<ui64>& GetOpenTxs() const;
+ size_t GetOpenTxCount() const;
TPartView GetPartView(const TLogoBlobID &bundle) const
{
@@ -345,6 +346,7 @@ private:
TRowVersionRanges RemovedRowVersions;
absl::flat_hash_map<ui64, size_t> TxRefs;
+ absl::flat_hash_set<ui64> OpenTxs;
absl::flat_hash_set<ui64> CheckTransactions;
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
@@ -371,12 +373,22 @@ private:
ui64 TxId;
};
+ struct TRollbackAddOpenTx {
+ ui64 TxId;
+ };
+
+ struct TRollbackRemoveOpenTx {
+ ui64 TxId;
+ };
+
using TRollbackOp = std::variant<
TRollbackRemoveTxRef,
TRollbackAddCommittedTx,
TRollbackRemoveCommittedTx,
TRollbackAddRemovedTx,
- TRollbackRemoveRemovedTx>;
+ TRollbackRemoveRemovedTx,
+ TRollbackAddOpenTx,
+ TRollbackRemoveOpenTx>;
struct TRollbackState {
TEpoch Epoch;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 84c1bb336cf..13a11588c64 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -4041,9 +4041,9 @@ public:
auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin();
Y_VERIFY(it != Self->GetUserTables().end());
- auto txs = txc.DB.GetOpenTxs(it->second->LocalTid);
+ auto openTxs = txc.DB.GetOpenTxs(it->second->LocalTid);
- Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(txs));
+ Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(openTxs));
return true;
}
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 2c3b33190d8..aa520ca4a27 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -15,6 +15,7 @@
#include <library/cpp/lwtrace/shuttle.h>
#include <library/cpp/time_provider/time_provider.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
namespace arrow {
@@ -1608,9 +1609,9 @@ struct TEvDataShard {
struct TEvGetOpenTxsResult : public TEventLocal<TEvGetOpenTxsResult, EvGetOpenTxsResult> {
TPathId PathId;
- TVector<ui64> OpenTxs;
+ absl::flat_hash_set<ui64> OpenTxs;
- TEvGetOpenTxsResult(const TPathId& pathId, TVector<ui64> openTxs)
+ TEvGetOpenTxsResult(const TPathId& pathId, absl::flat_hash_set<ui64> openTxs)
: PathId(pathId)
, OpenTxs(std::move(openTxs))
{ }
diff --git a/ydb/core/tx/datashard/make_snapshot_unit.cpp b/ydb/core/tx/datashard/make_snapshot_unit.cpp
index 4321daa414b..7e8c76af89d 100644
--- a/ydb/core/tx/datashard/make_snapshot_unit.cpp
+++ b/ydb/core/tx/datashard/make_snapshot_unit.cpp
@@ -59,7 +59,7 @@ EExecutionStatus TMakeSnapshotUnit::Execute(TOperation::TPtr op,
Y_VERIFY(DataShard.GetUserTables().contains(tableId));
ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid;
- auto openTxs = txc.DB.GetOpenTxs(localTableId);
+ const auto& openTxs = txc.DB.GetOpenTxs(localTableId);
TIntrusivePtr<TTableSnapshotContext> snapContext
= new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId}, !openTxs.empty());
txc.Env.MakeSnapshot(snapContext);
diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
index 84238821a02..1ccb4e10396 100644
--- a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
+++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
@@ -50,7 +50,8 @@ EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op,
size_t removedTxs = 0;
for (const auto& pr : DataShard.GetUserTables()) {
auto localTid = pr.second->LocalTid;
- for (ui64 txId : txc.DB.GetOpenTxs(localTid)) {
+ auto openTxs = txc.DB.GetOpenTxs(localTid);
+ for (ui64 txId : openTxs) {
if (removedTxs >= 1000) {
// We don't want to remove more than 1000 txs at a time
// Commit current changes and reschedule