aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-28 01:19:32 +0300
committersnaury <snaury@ydb.tech>2022-09-28 01:19:32 +0300
commit565cb58f95c7dfbc05d2a6d3a0c16222141026a1 (patch)
treeacd8739aeb5ab90b173d1f5722855d6159536d1d
parentd1371b28b9627ce1a09d7934ecc8697c3a087c4a (diff)
downloadydb-565cb58f95c7dfbc05d2a6d3a0c16222141026a1.tar.gz
Prevent lock reuse after commit or rollback
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp36
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp45
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp111
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h90
-rw-r--r--ydb/core/tx/datashard/datashard_ut_locks.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp92
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp33
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp37
-rw-r--r--ydb/core/tx/datashard/setup_sys_locks.h12
13 files changed, 378 insertions, 114 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 41f31732865..ddc9e83714c 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -391,7 +391,9 @@ public:
return DataShardSysTable(tableId).SelectRow(row, columnIds, returnType, readTarget, holderFactory);
}
- Self->SysLocksTable().SetLock(tableId, row, LockTxId, LockNodeId);
+ if (LockTxId) {
+ Self->SysLocksTable().SetLock(tableId, row);
+ }
Self->SetTableAccessTime(tableId, Now);
return TEngineHost::SelectRow(tableId, row, columnIds, returnType, readTarget, holderFactory);
@@ -404,7 +406,9 @@ public:
{
Y_VERIFY(!TSysTables::IsSystemTable(tableId), "SelectRange no system table is not supported");
- Self->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId);
+ if (LockTxId) {
+ Self->SysLocksTable().SetLock(tableId, range);
+ }
Self->SetTableAccessTime(tableId, Now);
return TEngineHost::SelectRange(tableId, range, columnIds, skipNullKeys, returnType, readTarget,
@@ -420,9 +424,9 @@ public:
CheckWriteConflicts(tableId, row);
if (LockTxId) {
- Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId);
+ Self->SysLocksTable().SetWriteLock(tableId, row);
} else {
- Self->SysLocksTable().BreakLock(tableId, row);
+ Self->SysLocksTable().BreakLocks(tableId, row);
}
Self->SetTableUpdateTime(tableId, Now);
@@ -478,9 +482,9 @@ public:
CheckWriteConflicts(tableId, row);
if (LockTxId) {
- Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId);
+ Self->SysLocksTable().SetWriteLock(tableId, row);
} else {
- Self->SysLocksTable().BreakLock(tableId, row);
+ Self->SysLocksTable().BreakLocks(tableId, row);
}
Self->SetTableUpdateTime(tableId, Now);
@@ -541,7 +545,7 @@ public:
// Don't use tx map when we know there's no write lock for a table
// Note: currently write lock implies uncommitted changes
- if (!Self->SysLocksTable().HasWriteLock(LockTxId, tableId)) {
+ if (!Self->SysLocksTable().HasCurrentWriteLock(tableId)) {
return nullptr;
}
@@ -610,16 +614,18 @@ public:
void AddReadConflict(const TTableId& tableId, ui64 txId) const {
Y_UNUSED(tableId);
- if (LockTxId) {
- // We have detected uncommitted changes in txId that could affect
- // our read result. We arrange a conflict that breaks our lock
- // when txId commits.
- Self->SysLocksTable().AddReadConflict(txId, LockTxId, LockNodeId);
- }
+ Y_VERIFY(LockTxId);
+
+ // We have detected uncommitted changes in txId that could affect
+ // our read result. We arrange a conflict that breaks our lock
+ // when txId commits.
+ Self->SysLocksTable().AddReadConflict(txId);
}
void CheckReadConflict(const TTableId& tableId, const TRowVersion& rowVersion) const {
Y_UNUSED(tableId);
+ Y_VERIFY(LockTxId);
+
if (rowVersion > ReadVersion) {
// We are reading from snapshot at ReadVersion and should not normally
// observe changes with a version above that. However, if we have an
@@ -628,7 +634,7 @@ public:
// snapshot. This is a clear indication of a conflict between read
// and that future conflict, hence we must break locks and abort.
// TODO: add an actual abort
- Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId);
+ Self->SysLocksTable().BreakSetLocks();
EngineBay.GetKqpComputeCtx().SetInconsistentReads();
}
}
@@ -696,7 +702,7 @@ public:
void AddWriteConflict(const TTableId& tableId, ui64 txId) const {
Y_UNUSED(tableId);
if (LockTxId) {
- Self->SysLocksTable().AddWriteConflict(txId, LockTxId, LockNodeId);
+ Self->SysLocksTable().AddWriteConflict(txId);
} else {
Self->SysLocksTable().BreakLock(txId);
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 75c346ea865..c4297970d0c 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -599,7 +599,7 @@ private:
if (!TxMap &&
State.LockId &&
!TSysTables::IsSystemTable(State.PathId) &&
- Self->SysLocksTable().HasWriteLock(State.LockId, State.PathId))
+ Self->SysLocksTable().HasCurrentWriteLock(State.PathId))
{
TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min());
}
@@ -655,7 +655,7 @@ private:
// We have skipped uncommitted changes in txId, which would affect
// the read result when it commits. Add a conflict edge that breaks
// our lock when txId is committed.
- Self->SysLocksTable().AddReadConflict(txId, State.LockId, State.LockNodeId);
+ Self->SysLocksTable().AddReadConflict(txId);
}
void CheckReadConflict(const TRowVersion& rowVersion) {
@@ -843,7 +843,34 @@ public:
state.LockNodeId = state.Request->Record.GetLockNodeId();
TDataShardLocksDb locksDb(*Self, txc);
- TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb);
+ TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb);
+
+ if (guardLocks.LockTxId) {
+ switch (Self->SysLocksTable().EnsureCurrentLock()) {
+ case EEnsureCurrentLock::Success:
+ // Lock is valid, we may continue with reads and side-effects
+ break;
+
+ case EEnsureCurrentLock::Broken:
+ // Lock is valid, but broken, we could abort early in some
+ // cases, but it doesn't affect correctness.
+ break;
+
+ case EEnsureCurrentLock::TooMany:
+ // Lock cannot be created, it's not necessarily a problem
+ // for read-only transactions.
+ break;
+
+ case EEnsureCurrentLock::Abort:
+ // Lock cannot be created and we must abort
+ SendErrorAndAbort(
+ ctx,
+ state,
+ Ydb::StatusIds::ABORTED,
+ TStringBuilder() << "Transaction was already committed or aborted");
+ return EExecutionStatus::DelayComplete;
+ }
+ }
if (!Read(txc, ctx, state))
return EExecutionStatus::Restart;
@@ -891,7 +918,7 @@ public:
hadWrites |= locksDb.HasChanges();
// We remember acquired lock for faster checking
- state.Lock = guard.Lock;
+ state.Lock = guardLocks.Lock;
}
if (!Self->IsFollower()) {
@@ -1392,21 +1419,21 @@ private:
true,
key.GetCells(),
true);
- sysLocks.SetLock(tableId, lockRange, state.LockId, state.LockNodeId);
+ sysLocks.SetLock(tableId, lockRange);
} else {
- sysLocks.SetLock(tableId, key.GetCells(), state.LockId, state.LockNodeId);
+ sysLocks.SetLock(tableId, key.GetCells());
}
}
} else {
// no keys, so we must have ranges (has been checked initially)
for (size_t i = 0; i < state.Request->Ranges.size(); ++i) {
auto range = state.Request->Ranges[i].ToTableRange();
- sysLocks.SetLock(tableId, range, state.LockId, state.LockNodeId);
+ sysLocks.SetLock(tableId, range);
}
}
if (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult()) {
- sysLocks.BreakSetLocks(state.LockId, state.LockNodeId);
+ sysLocks.BreakSetLocks();
}
auto locks = sysLocks.ApplyLocks();
@@ -1684,7 +1711,7 @@ public:
<< ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery);
TDataShardLocksDb locksDb(*Self, txc);
- TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb);
+ TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb);
Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self));
if (Reader->Read(txc, ctx)) {
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index f9506c55f60..dd138f7de82 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -200,7 +200,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
}
- self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells());
+ self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
}
}
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 060c9e79beb..054bedfc2b8 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -154,7 +154,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
continue;
}
- self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells());
+ self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion);
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index e85c6f81b94..50662025ef1 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -137,17 +137,23 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable
}
void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const {
- Shard->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId);
+ if (LockTxId) {
+ Shard->SysLocksTable().SetLock(tableId, range);
+ }
Shard->SetTableAccessTime(tableId, Now);
}
void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const {
- Shard->SysLocksTable().SetLock(tableId, key, LockTxId, LockNodeId);
+ if (LockTxId) {
+ Shard->SysLocksTable().SetLock(tableId, key);
+ }
Shard->SetTableAccessTime(tableId, Now);
}
void TKqpDatashardComputeContext::BreakSetLocks() const {
- Shard->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId);
+ if (LockTxId) {
+ Shard->SysLocksTable().BreakSetLocks();
+ }
}
void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) {
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index cffc2a03409..686a8c63265 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -779,7 +779,11 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
Locker.ForceBreakLock(Update->LockTxId);
Locker.SaveBrokenPersistentLocks(Db);
} else {
- lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId);
+ if (Update->Lock) {
+ lock = std::move(Update->Lock);
+ } else {
+ lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId);
+ }
if (!lock) {
counter = TLock::ErrorTooMuch;
} else if (lock->IsBroken()) {
@@ -933,41 +937,38 @@ void TSysLocks::CommitLock(const TArrayRef<const TCell>& key) {
}
}
-void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
+void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+ Y_VERIFY(Update && Update->LockTxId);
Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
return;
- if (lockTxId) {
- Y_VERIFY(Update);
- Update->AddPointLock(Locker.MakePoint(tableId, key), lockTxId, lockNodeId);
- }
+ Update->AddPointLock(Locker.MakePoint(tableId, key));
}
-void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId) {
+void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range) {
if (range.Point) { // if range is point replace it with a point lock
- SetLock(tableId, range.From, lockTxId, lockNodeId);
+ SetLock(tableId, range.From);
return;
}
+ Y_VERIFY(Update && Update->LockTxId);
Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
return;
- if (lockTxId) {
- Y_VERIFY(Update);
- Update->AddRangeLock(Locker.MakeRange(tableId, range), lockTxId, lockNodeId);
- }
+ Update->AddRangeLock(Locker.MakeRange(tableId, range));
}
-void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
+void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+ Y_VERIFY(Update && Update->LockTxId);
Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
return;
if (auto* table = Locker.FindTablePtr(tableId)) {
- Update->AddWriteLock(table, lockTxId, lockNodeId);
- AddWriteConflict(tableId, key, lockTxId, lockNodeId);
+ Update->AddWriteLock(table);
+ AddWriteConflict(tableId, key);
}
}
@@ -977,7 +978,7 @@ void TSysLocks::BreakLock(ui64 lockId) {
}
}
-void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef<const TCell>& key) {
Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks)));
if (auto* table = Locker.FindTablePtr(tableId)) {
@@ -994,29 +995,28 @@ void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>&
}
}
-void TSysLocks::AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) {
- Y_UNUSED(lockNodeId);
+void TSysLocks::AddReadConflict(ui64 conflictId) {
+ Y_VERIFY(Update && Update->LockTxId);
- if (conflictId != lockTxId) {
+ if (conflictId != Update->LockTxId) {
if (auto* lock = Locker.FindLockPtr(conflictId)) {
Update->AddReadConflictLock(lock);
}
}
}
-void TSysLocks::AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) {
- Y_UNUSED(lockNodeId);
+void TSysLocks::AddWriteConflict(ui64 conflictId) {
+ Y_VERIFY(Update && Update->LockTxId);
- if (conflictId != lockTxId) {
+ if (conflictId != Update->LockTxId) {
if (auto* lock = Locker.FindLockPtr(conflictId)) {
Update->AddWriteConflictLock(lock);
}
}
}
-void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
- Y_UNUSED(lockTxId);
- Y_UNUSED(lockNodeId);
+void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+ Y_VERIFY(Update && Update->LockTxId);
if (auto* table = Locker.FindTablePtr(tableId)) {
if (table->HasRangeLocks()) {
@@ -1035,6 +1035,7 @@ void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const
}
void TSysLocks::BreakAllLocks(const TTableId& tableId) {
+ Y_VERIFY(Update);
Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks)));
if (!Self->IsUserTable(tableId))
return;
@@ -1046,11 +1047,10 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) {
}
}
-void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) {
- Y_VERIFY(Update);
+void TSysLocks::BreakSetLocks() {
+ Y_VERIFY(Update && Update->LockTxId);
- if (lockTxId)
- Update->BreakSetLocks(lockTxId, lockNodeId);
+ Update->BreakSetLocks();
}
bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const {
@@ -1059,8 +1059,10 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const {
return ok && (Self->TabletID() == tabletId);
}
-bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const {
- if (Update && Update->LockTxId == lockId && Update->WriteTables) {
+bool TSysLocks::HasCurrentWriteLock(const TTableId& tableId) const {
+ Y_VERIFY(Update && Update->LockTxId);
+
+ if (Update->WriteTables) {
if (auto* table = Locker.FindTablePtr(tableId.PathId)) {
if (table->IsInList<TTableLocksWriteListTag>()) {
return true;
@@ -1068,7 +1070,7 @@ bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const {
}
}
- if (auto* lock = Locker.FindLockPtr(lockId)) {
+ if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) {
if (lock->WriteTables.contains(tableId.PathId)) {
return true;
}
@@ -1077,6 +1079,22 @@ bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const {
return false;
}
+bool TSysLocks::HasCurrentWriteLocks() const {
+ Y_VERIFY(Update && Update->LockTxId);
+
+ if (Update->WriteTables) {
+ return true;
+ }
+
+ if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) {
+ if (lock->IsWriteLock()) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
bool TSysLocks::HasWriteLocks(const TTableId& tableId) const {
if (Update && Update->WriteTables) {
return true;
@@ -1091,14 +1109,31 @@ bool TSysLocks::HasWriteLocks(const TTableId& tableId) const {
return false;
}
-bool TSysLocks::MayAddLock(ui64 lockId) const {
- if (auto* lock = Locker.FindLockPtr(lockId)) {
- // We may expand the lock unless it's broken
- return !lock->IsBroken();
+EEnsureCurrentLock TSysLocks::EnsureCurrentLock() {
+ Y_VERIFY(Update && Update->LockTxId);
+ Y_VERIFY(Db, "EnsureCurrentLock needs a valid locks database");
+
+ if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) {
+ // We cannot expand a broken lock
+ if (lock->IsBroken()) {
+ return EEnsureCurrentLock::Broken;
+ }
+
+ Update->Lock = lock;
+
+ return EEnsureCurrentLock::Success;
+ }
+
+ if (!Db->MayAddLock(Update->LockTxId)) {
+ return EEnsureCurrentLock::Abort;
+ }
+
+ Update->Lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId);
+ if (!Update->Lock) {
+ return EEnsureCurrentLock::TooMany;
}
- Y_VERIFY(Db, "MayAddLock needs a valid locks database");
- return Db->MayAddLock(lockId);
+ return EEnsureCurrentLock::Success;
}
TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const {
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index d33a9777567..5acb2690e07 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -259,6 +259,14 @@ public:
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
~TLockInfo();
+ bool Empty() const {
+ return !(
+ IsPersistent() ||
+ !ReadTables.empty() ||
+ !WriteTables.empty() ||
+ IsBroken());
+ }
+
template<class TTag>
bool IsInList() const {
using TItem = TIntrusiveListItem<TLockInfo, TTag>;
@@ -620,22 +628,19 @@ struct TLocksUpdate {
return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks);
}
- void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) {
- Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
+ void AddRangeLock(const TRangeKey& range) {
ReadTables.PushBack(range.Table.Get());
AffectedTables.PushBack(range.Table.Get());
RangeLocks.push_back(range);
}
- void AddPointLock(const TPointKey& key, ui64 lockId, ui32 lockNodeId) {
- Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
+ void AddPointLock(const TPointKey& key) {
ReadTables.PushBack(key.Table.Get());
AffectedTables.PushBack(key.Table.Get());
PointLocks.push_back(key);
}
- void AddWriteLock(TTableLocks* table, ui64 lockId, ui32 lockNodeId) {
- Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
+ void AddWriteLock(TTableLocks* table) {
WriteTables.PushBack(table);
AffectedTables.PushBack(table);
}
@@ -668,8 +673,7 @@ struct TLocksUpdate {
EraseLocks.PushBack(lock);
}
- void BreakSetLocks(ui64 lockId, ui32 lockNodeId) {
- Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
+ void BreakSetLocks() {
BreakOwn = true;
}
};
@@ -678,6 +682,22 @@ struct TLocksCache {
THashMap<ui64, TSysTables::TLocksTable::TLock> Locks;
};
+enum class EEnsureCurrentLock {
+ // New lock was created or an existing unbroken lock was found.
+ Success,
+ // Lock was already broken, this is not an error for some operations, e.g.
+ // readonly snapshot operations may usually continue until they write.
+ Broken,
+ // Some constraint prevents adding new lock, e.g. there are too many locks
+ // or not enough memory. This is usually similar to Broken.
+ TooMany,
+ // Operation must abort due to temporal violation. This may happen when
+ // transaction was already marked committed or aborted in the transaction
+ // map, but not yet fully compacted. New reads and especially writes may
+ // cause inconsistencies or data corruption and cannot be performed.
+ Abort,
+};
+
/// /sys/locks table logic
class TSysLocks {
public:
@@ -690,8 +710,21 @@ public:
, Locker(self)
{}
- void SetTxUpdater(TLocksUpdate* up) {
- Update = up;
+ void SetupUpdate(TLocksUpdate* update, ILocksDb* db = nullptr) noexcept {
+ Y_VERIFY(!Update, "Cannot setup a recursive update");
+ Y_VERIFY(update, "Cannot setup a nullptr update");
+ Update = update;
+ Db = db;
+ }
+
+ void ResetUpdate() noexcept {
+ if (Y_LIKELY(Update)) {
+ if (Update->Lock && Update->Lock->Empty()) {
+ Locker.RemoveLock(Update->LockTxId, nullptr);
+ }
+ Update = nullptr;
+ Db = nullptr;
+ }
}
void SetAccessLog(TLocksCache* log) {
@@ -702,10 +735,6 @@ public:
Cache = cache;
}
- void SetDb(ILocksDb* db) {
- Db = db;
- }
-
ui64 CurrentLockTxId() const {
Y_VERIFY(Update);
return Update->LockTxId;
@@ -724,20 +753,33 @@ public:
TLock GetLock(const TArrayRef<const TCell>& syslockKey) const;
void EraseLock(const TArrayRef<const TCell>& syslockKey);
void CommitLock(const TArrayRef<const TCell>& syslockKey);
- void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
- void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId);
- void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
+ void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
+ void SetLock(const TTableId& tableId, const TTableRange& range);
+ void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
void BreakLock(ui64 lockId);
- void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
- void AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId);
- void AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId);
- void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
+ void BreakLocks(const TTableId& tableId, const TArrayRef<const TCell>& key);
+ void AddReadConflict(ui64 conflictId);
+ void AddWriteConflict(ui64 conflictId);
+ void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key);
void BreakAllLocks(const TTableId& tableId);
- void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId);
+ void BreakSetLocks();
bool IsMyKey(const TArrayRef<const TCell>& key) const;
- bool HasWriteLock(ui64 lockId, const TTableId& tableId) const;
+ bool HasCurrentWriteLock(const TTableId& tableId) const;
+ bool HasCurrentWriteLocks() const;
bool HasWriteLocks(const TTableId& tableId) const;
- bool MayAddLock(ui64 lockId) const;
+
+ /**
+ * Ensures current update has a valid lock pointer
+ *
+ * Prerequisites: TSetupSysLocks with LockId and LocksDb is active, and
+ * operation is planning to set read or write locks.
+ *
+ * Returns Success when a new lock is allocated or an existing (unbroken)
+ * lock is found. Returns Broken when a lock is likely to fail, e.g. due
+ * to memory or other constraints. Returns Abort when operation must abort
+ * early, e.g. because the given LockId cannot be reused.
+ */
+ EEnsureCurrentLock EnsureCurrentLock();
ui64 LocksCount() const { return Locker.LocksCount(); }
ui64 BrokenLocksCount() const { return Locker.BrokenLocksCount(); }
diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp
index e7724d352f0..8f15575378a 100644
--- a/ydb/core/tx/datashard/datashard_ut_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp
@@ -159,21 +159,21 @@ namespace NTest {
template <typename T>
void SetLock(const TPointKey<T>& key) {
- Locks.SetLock(TableId, key.GetRow(), LockId(), 0);
+ Locks.SetLock(TableId, key.GetRow());
}
template <typename T>
void SetLock(const TRangeKey<T>& range) {
- Locks.SetLock(TableId, range.GetRowsRange(), LockId(), 0);
+ Locks.SetLock(TableId, range.GetRowsRange());
}
template <typename T>
- void BreakLock(const TPointKey<T>& key) {
- Locks.BreakLock(TableId, key.GetRow());
+ void BreakLocks(const TPointKey<T>& key) {
+ Locks.BreakLocks(TableId, key.GetRow());
}
void BreakSetLocks() {
- Locks.BreakSetLocks(LockId(), 0);
+ Locks.BreakSetLocks();
}
//
@@ -191,17 +191,17 @@ namespace NTest {
void StartTx(TLocksUpdate& update) {
update.LockTxId = 0;
- Locks.SetTxUpdater(&update);
+ Locks.SetupUpdate(&update);
}
void StartTx(ui64 lockTxId, TLocksUpdate& update) {
update.LockTxId = lockTxId;
- Locks.SetTxUpdater(&update);
+ Locks.SetupUpdate(&update);
}
TVector<TSysLocks::TLock> ApplyTxLocks() {
auto locks = Locks.ApplyLocks();
- Locks.SetTxUpdater(nullptr);
+ Locks.ResetUpdate();
return locks;
}
@@ -233,7 +233,7 @@ namespace NTest {
template <typename T>
void Update(const TVector<T>& updates) {
for (auto& value : updates) {
- BreakLock(TLockTester::TPointKey<T>(value));
+ BreakLocks(TLockTester::TPointKey<T>(value));
}
}
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 0cd1e6aec2f..b75720bb163 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -2603,6 +2603,98 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
observer.InjectClearTasks = false;
observer.InjectLocks.reset();
}
+
+ Y_UNIT_TEST(LockedWriteReuseAfterCommit) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Write uncommitted changes to key 3 with tx 123
+ // The lock for tx 123 was committed and removed, and cannot be reused
+ // until all changes are fully compacted. Otherwise new changes will
+ // appear as immediately committed in the past.
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 31)
+ )")),
+ "ERROR: ABORTED");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 4f9d2066994..29a8a625806 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -121,6 +121,35 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
return EExecutionStatus::Restart;
}
engine->SetMemoryLimit(txc.GetMemoryLimit() - tx->GetDataTx()->GetTxSize());
+
+ if (guardLocks.LockTxId) {
+ switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
+ case EEnsureCurrentLock::Success:
+ // Lock is valid, we may continue with reads and side-effects
+ break;
+
+ case EEnsureCurrentLock::Broken:
+ // Lock is valid, but broken, we could abort early in some
+ // cases, but it doesn't affect correctness.
+ break;
+
+ case EEnsureCurrentLock::TooMany:
+ // Lock cannot be created, it's not necessarily a problem
+ // for read-only transactions, for non-readonly we need to
+ // abort;
+ if (op->IsReadOnly()) {
+ break;
+ }
+
+ [[fallthrough]];
+
+ case EEnsureCurrentLock::Abort:
+ // Lock cannot be created and we must abort
+ op->SetAbortedFlag();
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
+ return EExecutionStatus::Executed;
+ }
+ }
}
try {
@@ -260,8 +289,8 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
KqpFillTxStats(DataShard, counters, *result);
}
- if (counters.InvisibleRowSkips) {
- DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId());
+ if (counters.InvisibleRowSkips && op->LockTxId()) {
+ DataShard.SysLocksTable().BreakSetLocks();
}
AddLocksToResult(op, ctx);
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index f2c6bc2f808..0059180e3ae 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -140,7 +140,7 @@ public:
continue;
}
- DataShard.SysLocksTable().BreakLock(fullTableId, keyCells.GetCells());
+ DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion);
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 385189adafe..81a30fb91fd 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -121,7 +121,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Executed;
}
-
try {
auto& kqpTx = dataTx->GetKqpTransaction();
auto& tasksRunner = dataTx->GetKqpTasksRunner();
@@ -135,6 +134,38 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Restart;
}
+ if (guardLocks.LockTxId) {
+ switch (DataShard.SysLocksTable().EnsureCurrentLock()) {
+ case EEnsureCurrentLock::Success:
+ // Lock is valid, we may continue with reads and side-effects
+ break;
+
+ case EEnsureCurrentLock::Broken:
+ // Lock is valid, but broken, we could abort early in some
+ // cases, but it doesn't affect correctness.
+ break;
+
+ case EEnsureCurrentLock::TooMany:
+ // Lock cannot be created, it's not necessarily a problem
+ // for read-only transactions, for non-readonly we need to
+ // abort;
+ if (op->IsReadOnly()) {
+ break;
+ }
+
+ [[fallthrough]];
+
+ case EEnsureCurrentLock::Abort:
+ // Lock cannot be created and we must abort
+ LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
+ << " aborting because it cannot acquire locks");
+
+ op->SetAbortedFlag();
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
+ return EExecutionStatus::Executed;
+ }
+ }
+
if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) {
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
DataShard.SysLocksTable().ApplyLocks();
@@ -229,8 +260,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
op->Result().Swap(result);
op->SetKqpAttachedRSFlag();
- if (dataTx->GetCounters().InvisibleRowSkips) {
- DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId());
+ if (dataTx->GetCounters().InvisibleRowSkips && op->LockTxId()) {
+ DataShard.SysLocksTable().BreakSetLocks();
}
AddLocksToResult(op, ctx);
diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h
index f10426c2a83..5a4c7eb65c3 100644
--- a/ydb/core/tx/datashard/setup_sys_locks.h
+++ b/ydb/core/tx/datashard/setup_sys_locks.h
@@ -17,8 +17,7 @@ struct TSetupSysLocks
CheckVersion = TRowVersion::Max();
BreakVersion = TRowVersion::Min();
- SysLocksTable.SetTxUpdater(this);
- SysLocksTable.SetDb(db);
+ SysLocksTable.SetupUpdate(this, db);
}
TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, TDataShard& self, ILocksDb* db)
@@ -29,8 +28,7 @@ struct TSetupSysLocks
CheckVersion = TRowVersion::Max();
BreakVersion = TRowVersion::Min();
- SysLocksTable.SetTxUpdater(this);
- SysLocksTable.SetDb(db);
+ SysLocksTable.SetupUpdate(this, db);
}
TSetupSysLocks(TOperation::TPtr op,
@@ -55,19 +53,17 @@ struct TSetupSysLocks
BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min();
}
- SysLocksTable.SetTxUpdater(this);
if (!op->LocksCache().Locks.empty())
SysLocksTable.SetCache(&op->LocksCache());
else
SysLocksTable.SetAccessLog(&op->LocksAccessLog());
- SysLocksTable.SetDb(db);
+ SysLocksTable.SetupUpdate(this, db);
}
~TSetupSysLocks() {
- SysLocksTable.SetTxUpdater(nullptr);
+ SysLocksTable.ResetUpdate();
SysLocksTable.SetCache(nullptr);
SysLocksTable.SetAccessLog(nullptr);
- SysLocksTable.SetDb(nullptr);
}
};