aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-16 13:29:06 +0300
committersnaury <snaury@ydb.tech>2023-03-16 13:29:06 +0300
commit4568fe3f6ddb72499ef17f5e0a1e7e2475a9570b (patch)
treee2a0337262ed6bb70ee58d7498e9dd2bb5f37bee
parentcf26acdd0906492b5946a9787a5834d2a3f3e3c6 (diff)
downloadydb-4568fe3f6ddb72499ef17f5e0a1e7e2475a9570b.tar.gz
Prefer total commit order for volatile transactions instead of disk reads
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp50
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h1
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h1
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp75
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h15
8 files changed, 137 insertions, 13 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 27f1076cdf5..d35b5da0ad9 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1884,4 +1884,10 @@ message TTxVolatileDetails {
// An optional change group for committing change collector changes
optional uint64 ChangeGroup = 6;
+
+ // A relative commit order local to the shard
+ optional uint64 CommitOrder = 7;
+
+ // When true all preceding transactions are dependencies
+ optional bool CommitOrdered = 8;
}
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 8cd85db72ac..819affd1509 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -393,7 +393,7 @@ public:
TVector<ui64> GetVolatileDependencies() const {
TVector<ui64> dependencies;
- if (!VolatileDependencies.empty()) {
+ if (!VolatileDependencies.empty() && !VolatileCommitOrdered) {
dependencies.reserve(VolatileDependencies.size());
for (ui64 dependency : VolatileDependencies) {
dependencies.push_back(dependency);
@@ -407,6 +407,10 @@ public:
return ChangeGroup;
}
+ bool GetVolatileCommitOrdered() const {
+ return VolatileCommitOrdered;
+ }
+
bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override {
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
@@ -785,11 +789,17 @@ public:
}
void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) {
- if (!Self->GetVolatileTxManager().GetTxMap() &&
- !Self->SysLocksTable().HasWriteLocks(tableId))
- {
- // We don't have any uncommitted changes, so there's nothing we
- // could possibly conflict with.
+ // When there are uncommitted changes (write locks) we must find which
+ // locks would break upon commit.
+ bool mustFindConflicts = Self->SysLocksTable().HasWriteLocks(tableId);
+
+ // When there are volatile changes (tx map) we try to find precise
+ // dependencies, but we may switch to total order on page faults.
+ const bool tryFindConflicts = mustFindConflicts ||
+ (!VolatileCommitOrdered && Self->GetVolatileTxManager().GetTxMap());
+
+ if (!tryFindConflicts) {
+ // We don't need to find conflicts
return;
}
@@ -804,8 +814,14 @@ public:
NTable::ITransactionObserverPtr txObserver;
if (LockTxId) {
txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid);
+ // Locked writes are immediate, increased latency is not critical
+ mustFindConflicts = true;
} else {
txObserver = new TWriteTxObserver(this);
+ // Prefer precise conflicts for non-distributed transactions
+ if (IsImmediateTx) {
+ mustFindConflicts = true;
+ }
}
// We are not actually interested in the row version, we only need to
@@ -815,7 +831,19 @@ public:
nullptr, txObserver);
if (res.Ready == NTable::EReady::Page) {
- throw TNotReadyTabletException();
+ if (mustFindConflicts || LockTxId) {
+ // We must gather all conflicts
+ throw TNotReadyTabletException();
+ }
+
+ // Upgrade to volatile ordered commit and ignore the page fault
+ if (!VolatileCommitOrdered) {
+ if (!VolatileTxId) {
+ VolatileTxId = EngineBay.GetTxId();
+ }
+ VolatileCommitOrdered = true;
+ }
+ return;
}
if (LockTxId || VolatileTxId) {
@@ -969,6 +997,7 @@ private:
mutable absl::flat_hash_set<ui64> VolatileCommitTxIds;
mutable absl::flat_hash_set<ui64> VolatileDependencies;
std::optional<ui64> ChangeGroup = std::nullopt;
+ bool VolatileCommitOrdered = false;
};
//
@@ -1205,6 +1234,13 @@ std::optional<ui64> TEngineBay::GetVolatileChangeGroup() const {
return host->GetVolatileChangeGroup();
}
+bool TEngineBay::GetVolatileCommitOrdered() const {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ return host->GetVolatileCommitOrdered();
+}
+
IEngineFlat * TEngineBay::GetEngine() {
if (!Engine) {
Engine = CreateEngineFlat(*EngineSettings);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 42515d42446..f6d6446fd29 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -115,6 +115,7 @@ public:
TVector<ui64> GetVolatileCommitTxIds() const;
TVector<ui64> GetVolatileDependencies() const;
std::optional<ui64> GetVolatileChangeGroup() const;
+ bool GetVolatileCommitOrdered() const;
void ResetCounters() { EngineHostCounters = TEngineHostCounters(); }
const TEngineHostCounters& GetCounters() const { return EngineHostCounters; }
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 5b114d06f39..1ffdab103bd 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -191,6 +191,7 @@ public:
TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); }
TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); }
std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); }
+ bool GetVolatileCommitOrdered() const { return EngineBay.GetVolatileCommitOrdered(); }
TActorId Source() const { return Source_; }
void SetSource(const TActorId& actorId) { Source_ = actorId; }
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index e7feeae7a93..d0090b84ad5 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -328,6 +328,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
tx->GetDataTx()->GetVolatileDependencies(),
participants,
tx->GetDataTx()->GetVolatileChangeGroup(),
+ tx->GetDataTx()->GetVolatileCommitOrdered(),
txc);
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index bd6400a9919..be8d2a8d554 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -325,6 +325,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->GetVolatileDependencies(),
participants,
dataTx->GetVolatileChangeGroup(),
+ dataTx->GetVolatileCommitOrdered(),
txc);
}
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 275ab15d439..aebb0516cd1 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -65,6 +65,8 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc);
+ Self->VolatileTxManager.RemoveFromCommitOrder(info);
+
if (info->AddCommitted) {
OnCommitted(ctx);
} else {
@@ -189,7 +191,9 @@ namespace NKikimr::NDataShard {
VolatileTxs.clear();
VolatileTxByVersion.clear();
VolatileTxByCommitTxId.clear();
+ VolatileTxByCommitOrder.Clear();
TxMap.Reset();
+ NextCommitOrder = 1;
}
bool TVolatileTxManager::Load(NIceDb::TNiceDb& db) {
@@ -199,6 +203,7 @@ namespace NKikimr::NDataShard {
VolatileTxs.empty() &&
VolatileTxByVersion.empty() &&
VolatileTxByCommitTxId.empty() &&
+ VolatileTxByCommitOrder.Empty() &&
!TxMap,
"Unexpected Load into non-empty volatile tx manager");
@@ -231,7 +236,9 @@ namespace NKikimr::NDataShard {
}
break;
case EVolatileTxState::Committed:
- PendingCommits.push_back(pr.first);
+ if (ReadyToDbCommit(pr.second.get())) {
+ PendingCommits.push_back(pr.first);
+ }
break;
case EVolatileTxState::Aborting:
PendingAborts.push_back(pr.first);
@@ -252,6 +259,8 @@ namespace NKikimr::NDataShard {
return false;
}
+ ui64 maxCommitOrder = 0;
+
while (!rowset.EndOfSet()) {
ui64 txId = rowset.GetValue<Schema::TxVolatileDetails::TxId>();
EVolatileTxState state = rowset.GetValue<Schema::TxVolatileDetails::State>();
@@ -274,13 +283,23 @@ namespace NKikimr::NDataShard {
info->ChangeGroup = details.GetChangeGroup();
}
info->AddCommitted = true; // we loaded it from local db, so it is committed
+ info->CommitOrder = details.GetCommitOrder();
+ info->CommitOrdered = details.GetCommitOrdered();
+
+ maxCommitOrder = Max(maxCommitOrder, info->CommitOrder);
if (!rowset.Next()) {
return false;
}
}
- auto postProcessTxInfo = [this](TVolatileTxInfo* info) {
+ NextCommitOrder = maxCommitOrder + 1;
+
+ // Prepare and sort a vector later (intrusive list sorting isn't good enough)
+ std::vector<TVolatileTxInfo*> byCommitOrder;
+ byCommitOrder.reserve(VolatileTxs.size());
+
+ auto postProcessTxInfo = [this, &byCommitOrder](TVolatileTxInfo* info) {
switch (info->State) {
case EVolatileTxState::Waiting:
case EVolatileTxState::Committed: {
@@ -308,6 +327,7 @@ namespace NKikimr::NDataShard {
++it;
}
+ byCommitOrder.push_back(info);
return;
}
@@ -327,6 +347,13 @@ namespace NKikimr::NDataShard {
VolatileTxByVersion.insert(pr.second.get());
}
+ std::sort(byCommitOrder.begin(), byCommitOrder.end(), [](TVolatileTxInfo* a, TVolatileTxInfo* b) -> bool {
+ return a->CommitOrder < b->CommitOrder;
+ });
+ for (TVolatileTxInfo* info : byCommitOrder) {
+ VolatileTxByCommitOrder.PushBack(info);
+ }
+
return true;
}
@@ -393,6 +420,7 @@ namespace NKikimr::NDataShard {
TConstArrayRef<ui64> dependencies,
TConstArrayRef<ui64> participants,
std::optional<ui64> changeGroup,
+ bool commitOrdered,
TTransactionContext& txc)
{
using Schema = TDataShard::Schema;
@@ -412,6 +440,8 @@ namespace NKikimr::NDataShard {
info->Dependencies.insert(dependencies.begin(), dependencies.end());
info->Participants.insert(participants.begin(), participants.end());
info->ChangeGroup = changeGroup;
+ info->CommitOrder = NextCommitOrder++;
+ info->CommitOrdered = commitOrdered;
if (info->Participants.empty()) {
// Transaction is committed when we don't have to wait for other participants
@@ -419,6 +449,7 @@ namespace NKikimr::NDataShard {
}
VolatileTxByVersion.insert(info);
+ VolatileTxByCommitOrder.PushBack(info);
if (!TxMap) {
TxMap = MakeIntrusive<TTxMap>();
@@ -461,6 +492,11 @@ namespace NKikimr::NDataShard {
details.SetChangeGroup(*info->ChangeGroup);
}
+ details.SetCommitOrder(info->CommitOrder);
+ if (info->CommitOrdered) {
+ details.SetCommitOrdered(true);
+ }
+
db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update(
NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State),
NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details)));
@@ -481,7 +517,7 @@ namespace NKikimr::NDataShard {
Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId);
});
- if (info->State == EVolatileTxState::Committed && info->Dependencies.empty()) {
+ if (ReadyToDbCommit(info)) {
AddPendingCommit(info->TxId);
}
}
@@ -507,6 +543,7 @@ namespace NKikimr::NDataShard {
Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies");
Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents");
+ Y_VERIFY_S(info->Empty(), "Unexpected remove of volatile tx " << txId << " which is in commit order linked list");
UnblockWaitingRemovalOperations(info);
@@ -616,6 +653,10 @@ namespace NKikimr::NDataShard {
// We will unblock operations when we persist the abort
AddPendingAbort(txId);
+
+ // Note that abort is always enqueued, never executed immediately,
+ // so it is safe to use info in this call.
+ RemoveFromCommitOrder(info);
}
void TVolatileTxManager::ProcessReadSet(
@@ -703,7 +744,7 @@ namespace NKikimr::NDataShard {
if (info->AddCommitted) {
RunCommitCallbacks(info);
}
- if (info->Dependencies.empty()) {
+ if (info->Dependencies.empty() && ReadyToDbCommit(info)) {
AddPendingCommit(txId);
}
}
@@ -820,4 +861,30 @@ namespace NKikimr::NDataShard {
}
}
+ void TVolatileTxManager::RemoveFromCommitOrder(TVolatileTxInfo* info) {
+ Y_VERIFY(!info->Empty(), "Volatile transaction is not in a commit order linked list");
+ Y_VERIFY(!VolatileTxByCommitOrder.Empty(), "Commit order linked list is unexpectedly empty");
+ const bool wasFirst = VolatileTxByCommitOrder.Front() == info;
+ info->Unlink();
+ if (wasFirst && !VolatileTxByCommitOrder.Empty()) {
+ auto* next = VolatileTxByCommitOrder.Front();
+ if (next->CommitOrdered && ReadyToDbCommit(next)) {
+ AddPendingCommit(next->TxId);
+ }
+ }
+ }
+
+ bool TVolatileTxManager::ReadyToDbCommit(TVolatileTxInfo* info) const {
+ if (info->State == EVolatileTxState::Committed && info->Dependencies.empty()) {
+ if (info->CommitOrdered) {
+ Y_VERIFY_DEBUG(!VolatileTxByCommitOrder.Empty());
+ return VolatileTxByCommitOrder.Front() == info;
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 6af069540ec..627c0c24236 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -5,6 +5,7 @@
#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/generic/hash.h>
+#include <util/generic/intrlist.h>
namespace NKikimr::NTabletFlatExecutor {
@@ -40,16 +41,20 @@ namespace NKikimr::NDataShard {
virtual void OnAbort(ui64 txId) = 0;
};
- struct TVolatileTxInfo {
+ struct TVolatileTxInfo
+ : public TIntrusiveListItem<TVolatileTxInfo>
+ {
+ ui64 CommitOrder;
ui64 TxId;
EVolatileTxState State = EVolatileTxState::Waiting;
+ bool AddCommitted = false;
+ bool CommitOrdered = false;
TRowVersion Version;
absl::flat_hash_set<ui64> CommitTxIds;
absl::flat_hash_set<ui64> Dependencies;
absl::flat_hash_set<ui64> Dependents;
absl::flat_hash_set<ui64> Participants;
std::optional<ui64> ChangeGroup;
- bool AddCommitted = false;
absl::flat_hash_set<ui64> BlockedOperations;
absl::flat_hash_set<ui64> WaitingRemovalOperations;
TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks;
@@ -177,6 +182,7 @@ namespace NKikimr::NDataShard {
TConstArrayRef<ui64> dependencies,
TConstArrayRef<ui64> participants,
std::optional<ui64> changeGroup,
+ bool commitOrdered,
TTransactionContext& txc);
bool AttachVolatileTxCallback(
@@ -222,15 +228,20 @@ namespace NKikimr::NDataShard {
void RunPendingCommitTx();
void RunPendingAbortTx();
+ void RemoveFromCommitOrder(TVolatileTxInfo* info);
+ bool ReadyToDbCommit(TVolatileTxInfo* info) const;
+
private:
TDataShard* const Self;
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
TVolatileTxByVersion VolatileTxByVersion;
+ TIntrusiveList<TVolatileTxInfo> VolatileTxByCommitOrder;
std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents;
TIntrusivePtr<TTxMap> TxMap;
std::deque<ui64> PendingCommits;
std::deque<ui64> PendingAborts;
+ ui64 NextCommitOrder = 1;
bool PendingCommitTxScheduled = false;
bool PendingAbortTxScheduled = false;
};