diff options
author | snaury <snaury@ydb.tech> | 2023-05-22 16:09:47 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-05-22 16:09:47 +0300 |
commit | c73082e61b645e3117d59a1e14e5d3e5159012bd (patch) | |
tree | 4fcb584062584061dbc4fcf56f9b0c9559cce41b | |
parent | 1366a3159c4a93d17b9cb243145613270891df00 (diff) | |
download | ydb-c73082e61b645e3117d59a1e14e5d3e5159012bd.tar.gz |
Track open transactions in a table for a fast open tx count
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 11 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 60 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/make_snapshot_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp | 3 |
8 files changed, 80 insertions, 28 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 93c3411da37..7150ae5be3f 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -374,11 +374,16 @@ bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const return Require(table)->HasRemovedTx(txId); } -TVector<ui64> TDatabase::GetOpenTxs(ui32 table) const +const absl::flat_hash_set<ui64>& TDatabase::GetOpenTxs(ui32 table) const { return Require(table)->GetOpenTxs(); } +size_t TDatabase::GetOpenTxCount(ui32 table) const +{ + return Require(table)->GetOpenTxCount(); +} + void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper) { if (Y_LIKELY(lower < upper)) { diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 9410e4906c0..c2eaa3fb854 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -129,11 +129,18 @@ public: bool HasRemovedTx(ui32 table, ui64 txId) const; /** - * Returns a list of open transactions in the provided table. This only + * Returns a set of open transactions in the provided table. This only * includes transactions with changes that are neither committed nor * removed. */ - TVector<ui64> GetOpenTxs(ui32 table) const; + const absl::flat_hash_set<ui64>& GetOpenTxs(ui32 table) const; + + /** + * Returns a number of open transactions in the provided table. This only + * includes transactions with changes that are neither committed nor + * removed. + */ + size_t GetOpenTxCount(ui32 table) const; /** * Remove row versions [lower, upper) from the given table diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index 5e976d64c2e..80d5bbe8ecd 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -64,6 +64,14 @@ void TTable::RollbackChanges() void operator()(const TRollbackRemoveRemovedTx& op) const { Self->RemovedTransactions.Remove(op.TxId); } + + void operator()(const TRollbackAddOpenTx& op) const { + Self->OpenTxs.insert(op.TxId); + } + + void operator()(const TRollbackRemoveOpenTx& op) const { + Self->OpenTxs.erase(op.TxId); + } }; std::visit(TApplyRollbackOp{ this }, RollbackOps.back()); @@ -477,6 +485,7 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset CheckTransactions.insert(txId); } TxRefs.erase(it); + OpenTxs.erase(txId); } } @@ -592,6 +601,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept if (!TxRefs.contains(txId)) { CheckTransactions.insert(txId); } + OpenTxs.erase(txId); } for (auto& item : txStatus->TxStatusPage->GetRemovedItems()) { const ui64 txId = item.GetTxId(); @@ -601,6 +611,7 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept if (!TxRefs.contains(txId)) { CheckTransactions.insert(txId); } + OpenTxs.erase(txId); } if (Mutable && txStatus->Epoch >= Mutable->Epoch) { @@ -723,7 +734,10 @@ void TTable::AddSafe(TPartView partView) if (partView->TxIdStats) { for (const auto& item : partView->TxIdStats->GetItems()) { const ui64 txId = item.GetTxId(); - ++TxRefs[txId]; + const auto newCount = ++TxRefs[txId]; + if (newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) { + OpenTxs.insert(txId); + } } } @@ -820,9 +834,17 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemG void TTable::AddTxRef(ui64 txId) { - ++TxRefs[txId]; + const auto newCount = ++TxRefs[txId]; + const bool addOpenTx = newCount == 1 && !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId); + if (addOpenTx) { + auto res = OpenTxs.insert(txId); + Y_VERIFY(res.second); + } if (RollbackState) { RollbackOps.emplace_back(TRollbackRemoveTxRef{ txId }); + if (addOpenTx) { + RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId }); + } } } @@ -865,6 +887,12 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) } RemovedTransactions.Remove(txId); } + if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) { + if (RollbackState) { + RollbackOps.emplace_back(TRollbackAddOpenTx{ txId }); + } + OpenTxs.erase(it); + } } // We don't know which keys have been commited, invalidate everything @@ -884,16 +912,18 @@ void TTable::RemoveTx(ui64 txId) RollbackOps.emplace_back(TRollbackRemoveRemovedTx{ txId }); } RemovedTransactions.Add(txId); + if (auto it = OpenTxs.find(txId); it != OpenTxs.end()) { + if (RollbackState) { + RollbackOps.emplace_back(TRollbackAddOpenTx{ txId }); + } + OpenTxs.erase(it); + } } } bool TTable::HasOpenTx(ui64 txId) const { - if (TxRefs.contains(txId)) { - return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId); - } - - return false; + return OpenTxs.contains(txId); } bool TTable::HasTxData(ui64 txId) const @@ -911,18 +941,14 @@ bool TTable::HasRemovedTx(ui64 txId) const return RemovedTransactions.Contains(txId); } -TVector<ui64> TTable::GetOpenTxs() const +const absl::flat_hash_set<ui64>& TTable::GetOpenTxs() const { - TVector<ui64> txs; - - for (auto& pr : TxRefs) { - ui64 txId = pr.first; - if (!CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) { - txs.push_back(txId); - } - } + return OpenTxs; +} - return txs; +size_t TTable::GetOpenTxCount() const +{ + return OpenTxs.size(); } TMemTable& TTable::MemTable() diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 54430fcfbc9..6200fa9087a 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -172,7 +172,8 @@ public: bool HasCommittedTx(ui64 txId) const; bool HasRemovedTx(ui64 txId) const; - TVector<ui64> GetOpenTxs() const; + const absl::flat_hash_set<ui64>& GetOpenTxs() const; + size_t GetOpenTxCount() const; TPartView GetPartView(const TLogoBlobID &bundle) const { @@ -345,6 +346,7 @@ private: TRowVersionRanges RemovedRowVersions; absl::flat_hash_map<ui64, size_t> TxRefs; + absl::flat_hash_set<ui64> OpenTxs; absl::flat_hash_set<ui64> CheckTransactions; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; @@ -371,12 +373,22 @@ private: ui64 TxId; }; + struct TRollbackAddOpenTx { + ui64 TxId; + }; + + struct TRollbackRemoveOpenTx { + ui64 TxId; + }; + using TRollbackOp = std::variant< TRollbackRemoveTxRef, TRollbackAddCommittedTx, TRollbackRemoveCommittedTx, TRollbackAddRemovedTx, - TRollbackRemoveRemovedTx>; + TRollbackRemoveRemovedTx, + TRollbackAddOpenTx, + TRollbackRemoveOpenTx>; struct TRollbackState { TEpoch Epoch; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 84c1bb336cf..13a11588c64 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -4041,9 +4041,9 @@ public: auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin(); Y_VERIFY(it != Self->GetUserTables().end()); - auto txs = txc.DB.GetOpenTxs(it->second->LocalTid); + auto openTxs = txc.DB.GetOpenTxs(it->second->LocalTid); - Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(txs)); + Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(openTxs)); return true; } diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 2c3b33190d8..aa520ca4a27 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -15,6 +15,7 @@ #include <library/cpp/lwtrace/shuttle.h> #include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> namespace arrow { @@ -1608,9 +1609,9 @@ struct TEvDataShard { struct TEvGetOpenTxsResult : public TEventLocal<TEvGetOpenTxsResult, EvGetOpenTxsResult> { TPathId PathId; - TVector<ui64> OpenTxs; + absl::flat_hash_set<ui64> OpenTxs; - TEvGetOpenTxsResult(const TPathId& pathId, TVector<ui64> openTxs) + TEvGetOpenTxsResult(const TPathId& pathId, absl::flat_hash_set<ui64> openTxs) : PathId(pathId) , OpenTxs(std::move(openTxs)) { } diff --git a/ydb/core/tx/datashard/make_snapshot_unit.cpp b/ydb/core/tx/datashard/make_snapshot_unit.cpp index 4321daa414b..7e8c76af89d 100644 --- a/ydb/core/tx/datashard/make_snapshot_unit.cpp +++ b/ydb/core/tx/datashard/make_snapshot_unit.cpp @@ -59,7 +59,7 @@ EExecutionStatus TMakeSnapshotUnit::Execute(TOperation::TPtr op, Y_VERIFY(DataShard.GetUserTables().contains(tableId)); ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; - auto openTxs = txc.DB.GetOpenTxs(localTableId); + const auto& openTxs = txc.DB.GetOpenTxs(localTableId); TIntrusivePtr<TTableSnapshotContext> snapContext = new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId}, !openTxs.empty()); txc.Env.MakeSnapshot(snapContext); diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp index 84238821a02..1ccb4e10396 100644 --- a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp +++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp @@ -50,7 +50,8 @@ EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op, size_t removedTxs = 0; for (const auto& pr : DataShard.GetUserTables()) { auto localTid = pr.second->LocalTid; - for (ui64 txId : txc.DB.GetOpenTxs(localTid)) { + auto openTxs = txc.DB.GetOpenTxs(localTid); + for (ui64 txId : openTxs) { if (removedTxs >= 1000) { // We don't want to remove more than 1000 txs at a time // Commit current changes and reschedule |