diff options
author | snaury <snaury@ydb.tech> | 2023-03-16 13:29:06 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-16 13:29:06 +0300 |
commit | 4568fe3f6ddb72499ef17f5e0a1e7e2475a9570b (patch) | |
tree | e2a0337262ed6bb70ee58d7498e9dd2bb5f37bee | |
parent | cf26acdd0906492b5946a9787a5834d2a3f3e3c6 (diff) | |
download | ydb-4568fe3f6ddb72499ef17f5e0a1e7e2475a9570b.tar.gz |
Prefer total commit order for volatile transactions instead of disk reads
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 75 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 15 |
8 files changed, 137 insertions, 13 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 27f1076cdf5..d35b5da0ad9 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1884,4 +1884,10 @@ message TTxVolatileDetails { // An optional change group for committing change collector changes optional uint64 ChangeGroup = 6; + + // A relative commit order local to the shard + optional uint64 CommitOrder = 7; + + // When true all preceding transactions are dependencies + optional bool CommitOrdered = 8; } diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 8cd85db72ac..819affd1509 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -393,7 +393,7 @@ public: TVector<ui64> GetVolatileDependencies() const { TVector<ui64> dependencies; - if (!VolatileDependencies.empty()) { + if (!VolatileDependencies.empty() && !VolatileCommitOrdered) { dependencies.reserve(VolatileDependencies.size()); for (ui64 dependency : VolatileDependencies) { dependencies.push_back(dependency); @@ -407,6 +407,10 @@ public: return ChangeGroup; } + bool GetVolatileCommitOrdered() const { + return VolatileCommitOrdered; + } + bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override { if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); @@ -785,11 +789,17 @@ public: } void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) { - if (!Self->GetVolatileTxManager().GetTxMap() && - !Self->SysLocksTable().HasWriteLocks(tableId)) - { - // We don't have any uncommitted changes, so there's nothing we - // could possibly conflict with. + // When there are uncommitted changes (write locks) we must find which + // locks would break upon commit. + bool mustFindConflicts = Self->SysLocksTable().HasWriteLocks(tableId); + + // When there are volatile changes (tx map) we try to find precise + // dependencies, but we may switch to total order on page faults. + const bool tryFindConflicts = mustFindConflicts || + (!VolatileCommitOrdered && Self->GetVolatileTxManager().GetTxMap()); + + if (!tryFindConflicts) { + // We don't need to find conflicts return; } @@ -804,8 +814,14 @@ public: NTable::ITransactionObserverPtr txObserver; if (LockTxId) { txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid); + // Locked writes are immediate, increased latency is not critical + mustFindConflicts = true; } else { txObserver = new TWriteTxObserver(this); + // Prefer precise conflicts for non-distributed transactions + if (IsImmediateTx) { + mustFindConflicts = true; + } } // We are not actually interested in the row version, we only need to @@ -815,7 +831,19 @@ public: nullptr, txObserver); if (res.Ready == NTable::EReady::Page) { - throw TNotReadyTabletException(); + if (mustFindConflicts || LockTxId) { + // We must gather all conflicts + throw TNotReadyTabletException(); + } + + // Upgrade to volatile ordered commit and ignore the page fault + if (!VolatileCommitOrdered) { + if (!VolatileTxId) { + VolatileTxId = EngineBay.GetTxId(); + } + VolatileCommitOrdered = true; + } + return; } if (LockTxId || VolatileTxId) { @@ -969,6 +997,7 @@ private: mutable absl::flat_hash_set<ui64> VolatileCommitTxIds; mutable absl::flat_hash_set<ui64> VolatileDependencies; std::optional<ui64> ChangeGroup = std::nullopt; + bool VolatileCommitOrdered = false; }; // @@ -1205,6 +1234,13 @@ std::optional<ui64> TEngineBay::GetVolatileChangeGroup() const { return host->GetVolatileChangeGroup(); } +bool TEngineBay::GetVolatileCommitOrdered() const { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + return host->GetVolatileCommitOrdered(); +} + IEngineFlat * TEngineBay::GetEngine() { if (!Engine) { Engine = CreateEngineFlat(*EngineSettings); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 42515d42446..f6d6446fd29 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -115,6 +115,7 @@ public: TVector<ui64> GetVolatileCommitTxIds() const; TVector<ui64> GetVolatileDependencies() const; std::optional<ui64> GetVolatileChangeGroup() const; + bool GetVolatileCommitOrdered() const; void ResetCounters() { EngineHostCounters = TEngineHostCounters(); } const TEngineHostCounters& GetCounters() const { return EngineHostCounters; } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 5b114d06f39..1ffdab103bd 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -191,6 +191,7 @@ public: TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); } TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); } std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); } + bool GetVolatileCommitOrdered() const { return EngineBay.GetVolatileCommitOrdered(); } TActorId Source() const { return Source_; } void SetSource(const TActorId& actorId) { Source_ = actorId; } diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index e7feeae7a93..d0090b84ad5 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -328,6 +328,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, tx->GetDataTx()->GetVolatileDependencies(), participants, tx->GetDataTx()->GetVolatileChangeGroup(), + tx->GetDataTx()->GetVolatileCommitOrdered(), txc); } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index bd6400a9919..be8d2a8d554 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -325,6 +325,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->GetVolatileDependencies(), participants, dataTx->GetVolatileChangeGroup(), + dataTx->GetVolatileCommitOrdered(), txc); } diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 275ab15d439..aebb0516cd1 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -65,6 +65,8 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc); + Self->VolatileTxManager.RemoveFromCommitOrder(info); + if (info->AddCommitted) { OnCommitted(ctx); } else { @@ -189,7 +191,9 @@ namespace NKikimr::NDataShard { VolatileTxs.clear(); VolatileTxByVersion.clear(); VolatileTxByCommitTxId.clear(); + VolatileTxByCommitOrder.Clear(); TxMap.Reset(); + NextCommitOrder = 1; } bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) { @@ -199,6 +203,7 @@ namespace NKikimr::NDataShard { VolatileTxs.empty() && VolatileTxByVersion.empty() && VolatileTxByCommitTxId.empty() && + VolatileTxByCommitOrder.Empty() && !TxMap, "Unexpected Load into non-empty volatile tx manager"); @@ -231,7 +236,9 @@ namespace NKikimr::NDataShard { } break; case EVolatileTxState::Committed: - PendingCommits.push_back(pr.first); + if (ReadyToDbCommit(pr.second.get())) { + PendingCommits.push_back(pr.first); + } break; case EVolatileTxState::Aborting: PendingAborts.push_back(pr.first); @@ -252,6 +259,8 @@ namespace NKikimr::NDataShard { return false; } + ui64 maxCommitOrder = 0; + while (!rowset.EndOfSet()) { ui64 txId = rowset.GetValue<Schema::TxVolatileDetails::TxId>(); EVolatileTxState state = rowset.GetValue<Schema::TxVolatileDetails::State>(); @@ -274,13 +283,23 @@ namespace NKikimr::NDataShard { info->ChangeGroup = details.GetChangeGroup(); } info->AddCommitted = true; // we loaded it from local db, so it is committed + info->CommitOrder = details.GetCommitOrder(); + info->CommitOrdered = details.GetCommitOrdered(); + + maxCommitOrder = Max(maxCommitOrder, info->CommitOrder); if (!rowset.Next()) { return false; } } - auto postProcessTxInfo = [this](TVolatileTxInfo* info) { + NextCommitOrder = maxCommitOrder + 1; + + // Prepare and sort a vector later (intrusive list sorting isn't good enough) + std::vector<TVolatileTxInfo*> byCommitOrder; + byCommitOrder.reserve(VolatileTxs.size()); + + auto postProcessTxInfo = [this, &byCommitOrder](TVolatileTxInfo* info) { switch (info->State) { case EVolatileTxState::Waiting: case EVolatileTxState::Committed: { @@ -308,6 +327,7 @@ namespace NKikimr::NDataShard { ++it; } + byCommitOrder.push_back(info); return; } @@ -327,6 +347,13 @@ namespace NKikimr::NDataShard { VolatileTxByVersion.insert(pr.second.get()); } + std::sort(byCommitOrder.begin(), byCommitOrder.end(), [](TVolatileTxInfo* a, TVolatileTxInfo* b) -> bool { + return a->CommitOrder < b->CommitOrder; + }); + for (TVolatileTxInfo* info : byCommitOrder) { + VolatileTxByCommitOrder.PushBack(info); + } + return true; } @@ -393,6 +420,7 @@ namespace NKikimr::NDataShard { TConstArrayRef<ui64> dependencies, TConstArrayRef<ui64> participants, std::optional<ui64> changeGroup, + bool commitOrdered, TTransactionContext& txc) { using Schema = TDataShard::Schema; @@ -412,6 +440,8 @@ namespace NKikimr::NDataShard { info->Dependencies.insert(dependencies.begin(), dependencies.end()); info->Participants.insert(participants.begin(), participants.end()); info->ChangeGroup = changeGroup; + info->CommitOrder = NextCommitOrder++; + info->CommitOrdered = commitOrdered; if (info->Participants.empty()) { // Transaction is committed when we don't have to wait for other participants @@ -419,6 +449,7 @@ namespace NKikimr::NDataShard { } VolatileTxByVersion.insert(info); + VolatileTxByCommitOrder.PushBack(info); if (!TxMap) { TxMap = MakeIntrusive<TTxMap>(); @@ -461,6 +492,11 @@ namespace NKikimr::NDataShard { details.SetChangeGroup(*info->ChangeGroup); } + details.SetCommitOrder(info->CommitOrder); + if (info->CommitOrdered) { + details.SetCommitOrdered(true); + } + db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update( NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State), NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details))); @@ -481,7 +517,7 @@ namespace NKikimr::NDataShard { Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId); }); - if (info->State == EVolatileTxState::Committed && info->Dependencies.empty()) { + if (ReadyToDbCommit(info)) { AddPendingCommit(info->TxId); } } @@ -507,6 +543,7 @@ namespace NKikimr::NDataShard { Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies"); Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents"); + Y_VERIFY_S(info->Empty(), "Unexpected remove of volatile tx " << txId << " which is in commit order linked list"); UnblockWaitingRemovalOperations(info); @@ -616,6 +653,10 @@ namespace NKikimr::NDataShard { // We will unblock operations when we persist the abort AddPendingAbort(txId); + + // Note that abort is always enqueued, never executed immediately, + // so it is safe to use info in this call. + RemoveFromCommitOrder(info); } void TVolatileTxManager::ProcessReadSet( @@ -703,7 +744,7 @@ namespace NKikimr::NDataShard { if (info->AddCommitted) { RunCommitCallbacks(info); } - if (info->Dependencies.empty()) { + if (info->Dependencies.empty() && ReadyToDbCommit(info)) { AddPendingCommit(txId); } } @@ -820,4 +861,30 @@ namespace NKikimr::NDataShard { } } + void TVolatileTxManager::RemoveFromCommitOrder(TVolatileTxInfo* info) { + Y_VERIFY(!info->Empty(), "Volatile transaction is not in a commit order linked list"); + Y_VERIFY(!VolatileTxByCommitOrder.Empty(), "Commit order linked list is unexpectedly empty"); + const bool wasFirst = VolatileTxByCommitOrder.Front() == info; + info->Unlink(); + if (wasFirst && !VolatileTxByCommitOrder.Empty()) { + auto* next = VolatileTxByCommitOrder.Front(); + if (next->CommitOrdered && ReadyToDbCommit(next)) { + AddPendingCommit(next->TxId); + } + } + } + + bool TVolatileTxManager::ReadyToDbCommit(TVolatileTxInfo* info) const { + if (info->State == EVolatileTxState::Committed && info->Dependencies.empty()) { + if (info->CommitOrdered) { + Y_VERIFY_DEBUG(!VolatileTxByCommitOrder.Empty()); + return VolatileTxByCommitOrder.Front() == info; + } + + return true; + } + + return false; + } + } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 6af069540ec..627c0c24236 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -5,6 +5,7 @@ #include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> #include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/generic/hash.h> +#include <util/generic/intrlist.h> namespace NKikimr::NTabletFlatExecutor { @@ -40,16 +41,20 @@ namespace NKikimr::NDataShard { virtual void OnAbort(ui64 txId) = 0; }; - struct TVolatileTxInfo { + struct TVolatileTxInfo + : public TIntrusiveListItem<TVolatileTxInfo> + { + ui64 CommitOrder; ui64 TxId; EVolatileTxState State = EVolatileTxState::Waiting; + bool AddCommitted = false; + bool CommitOrdered = false; TRowVersion Version; absl::flat_hash_set<ui64> CommitTxIds; absl::flat_hash_set<ui64> Dependencies; absl::flat_hash_set<ui64> Dependents; absl::flat_hash_set<ui64> Participants; std::optional<ui64> ChangeGroup; - bool AddCommitted = false; absl::flat_hash_set<ui64> BlockedOperations; absl::flat_hash_set<ui64> WaitingRemovalOperations; TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks; @@ -177,6 +182,7 @@ namespace NKikimr::NDataShard { TConstArrayRef<ui64> dependencies, TConstArrayRef<ui64> participants, std::optional<ui64> changeGroup, + bool commitOrdered, TTransactionContext& txc); bool AttachVolatileTxCallback( @@ -222,15 +228,20 @@ namespace NKikimr::NDataShard { void RunPendingCommitTx(); void RunPendingAbortTx(); + void RemoveFromCommitOrder(TVolatileTxInfo* info); + bool ReadyToDbCommit(TVolatileTxInfo* info) const; + private: TDataShard* const Self; absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info TVolatileTxByVersion VolatileTxByVersion; + TIntrusiveList<TVolatileTxInfo> VolatileTxByCommitOrder; std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents; TIntrusivePtr<TTxMap> TxMap; std::deque<ui64> PendingCommits; std::deque<ui64> PendingAborts; + ui64 NextCommitOrder = 1; bool PendingCommitTxScheduled = false; bool PendingAbortTxScheduled = false; }; |