diff options
author | snaury <snaury@ydb.tech> | 2022-09-28 01:19:32 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-28 01:19:32 +0300 |
commit | 565cb58f95c7dfbc05d2a6d3a0c16222141026a1 (patch) | |
tree | acd8739aeb5ab90b173d1f5722855d6159536d1d | |
parent | d1371b28b9627ce1a09d7934ecc8697c3a087c4a (diff) | |
download | ydb-565cb58f95c7dfbc05d2a6d3a0c16222141026a1.tar.gz |
Prevent lock reuse after commit or rollback
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 45 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 111 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 90 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_locks.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 92 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 37 | ||||
-rw-r--r-- | ydb/core/tx/datashard/setup_sys_locks.h | 12 |
13 files changed, 378 insertions, 114 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 41f31732865..ddc9e83714c 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -391,7 +391,9 @@ public: return DataShardSysTable(tableId).SelectRow(row, columnIds, returnType, readTarget, holderFactory); } - Self->SysLocksTable().SetLock(tableId, row, LockTxId, LockNodeId); + if (LockTxId) { + Self->SysLocksTable().SetLock(tableId, row); + } Self->SetTableAccessTime(tableId, Now); return TEngineHost::SelectRow(tableId, row, columnIds, returnType, readTarget, holderFactory); @@ -404,7 +406,9 @@ public: { Y_VERIFY(!TSysTables::IsSystemTable(tableId), "SelectRange no system table is not supported"); - Self->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId); + if (LockTxId) { + Self->SysLocksTable().SetLock(tableId, range); + } Self->SetTableAccessTime(tableId, Now); return TEngineHost::SelectRange(tableId, range, columnIds, skipNullKeys, returnType, readTarget, @@ -420,9 +424,9 @@ public: CheckWriteConflicts(tableId, row); if (LockTxId) { - Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId); + Self->SysLocksTable().SetWriteLock(tableId, row); } else { - Self->SysLocksTable().BreakLock(tableId, row); + Self->SysLocksTable().BreakLocks(tableId, row); } Self->SetTableUpdateTime(tableId, Now); @@ -478,9 +482,9 @@ public: CheckWriteConflicts(tableId, row); if (LockTxId) { - Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId); + Self->SysLocksTable().SetWriteLock(tableId, row); } else { - Self->SysLocksTable().BreakLock(tableId, row); + Self->SysLocksTable().BreakLocks(tableId, row); } Self->SetTableUpdateTime(tableId, Now); @@ -541,7 +545,7 @@ public: // Don't use tx map when we know there's no write lock for a table // Note: currently write lock implies uncommitted changes - if (!Self->SysLocksTable().HasWriteLock(LockTxId, tableId)) { + if (!Self->SysLocksTable().HasCurrentWriteLock(tableId)) { return nullptr; } @@ -610,16 +614,18 @@ public: void AddReadConflict(const TTableId& tableId, ui64 txId) const { Y_UNUSED(tableId); - if (LockTxId) { - // We have detected uncommitted changes in txId that could affect - // our read result. We arrange a conflict that breaks our lock - // when txId commits. - Self->SysLocksTable().AddReadConflict(txId, LockTxId, LockNodeId); - } + Y_VERIFY(LockTxId); + + // We have detected uncommitted changes in txId that could affect + // our read result. We arrange a conflict that breaks our lock + // when txId commits. + Self->SysLocksTable().AddReadConflict(txId); } void CheckReadConflict(const TTableId& tableId, const TRowVersion& rowVersion) const { Y_UNUSED(tableId); + Y_VERIFY(LockTxId); + if (rowVersion > ReadVersion) { // We are reading from snapshot at ReadVersion and should not normally // observe changes with a version above that. However, if we have an @@ -628,7 +634,7 @@ public: // snapshot. This is a clear indication of a conflict between read // and that future conflict, hence we must break locks and abort. // TODO: add an actual abort - Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId); + Self->SysLocksTable().BreakSetLocks(); EngineBay.GetKqpComputeCtx().SetInconsistentReads(); } } @@ -696,7 +702,7 @@ public: void AddWriteConflict(const TTableId& tableId, ui64 txId) const { Y_UNUSED(tableId); if (LockTxId) { - Self->SysLocksTable().AddWriteConflict(txId, LockTxId, LockNodeId); + Self->SysLocksTable().AddWriteConflict(txId); } else { Self->SysLocksTable().BreakLock(txId); } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 75c346ea865..c4297970d0c 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -599,7 +599,7 @@ private: if (!TxMap && State.LockId && !TSysTables::IsSystemTable(State.PathId) && - Self->SysLocksTable().HasWriteLock(State.LockId, State.PathId)) + Self->SysLocksTable().HasCurrentWriteLock(State.PathId)) { TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min()); } @@ -655,7 +655,7 @@ private: // We have skipped uncommitted changes in txId, which would affect // the read result when it commits. Add a conflict edge that breaks // our lock when txId is committed. - Self->SysLocksTable().AddReadConflict(txId, State.LockId, State.LockNodeId); + Self->SysLocksTable().AddReadConflict(txId); } void CheckReadConflict(const TRowVersion& rowVersion) { @@ -843,7 +843,34 @@ public: state.LockNodeId = state.Request->Record.GetLockNodeId(); TDataShardLocksDb locksDb(*Self, txc); - TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb); + TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb); + + if (guardLocks.LockTxId) { + switch (Self->SysLocksTable().EnsureCurrentLock()) { + case EEnsureCurrentLock::Success: + // Lock is valid, we may continue with reads and side-effects + break; + + case EEnsureCurrentLock::Broken: + // Lock is valid, but broken, we could abort early in some + // cases, but it doesn't affect correctness. + break; + + case EEnsureCurrentLock::TooMany: + // Lock cannot be created, it's not necessarily a problem + // for read-only transactions. + break; + + case EEnsureCurrentLock::Abort: + // Lock cannot be created and we must abort + SendErrorAndAbort( + ctx, + state, + Ydb::StatusIds::ABORTED, + TStringBuilder() << "Transaction was already committed or aborted"); + return EExecutionStatus::DelayComplete; + } + } if (!Read(txc, ctx, state)) return EExecutionStatus::Restart; @@ -891,7 +918,7 @@ public: hadWrites |= locksDb.HasChanges(); // We remember acquired lock for faster checking - state.Lock = guard.Lock; + state.Lock = guardLocks.Lock; } if (!Self->IsFollower()) { @@ -1392,21 +1419,21 @@ private: true, key.GetCells(), true); - sysLocks.SetLock(tableId, lockRange, state.LockId, state.LockNodeId); + sysLocks.SetLock(tableId, lockRange); } else { - sysLocks.SetLock(tableId, key.GetCells(), state.LockId, state.LockNodeId); + sysLocks.SetLock(tableId, key.GetCells()); } } } else { // no keys, so we must have ranges (has been checked initially) for (size_t i = 0; i < state.Request->Ranges.size(); ++i) { auto range = state.Request->Ranges[i].ToTableRange(); - sysLocks.SetLock(tableId, range, state.LockId, state.LockNodeId); + sysLocks.SetLock(tableId, range); } } if (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult()) { - sysLocks.BreakSetLocks(state.LockId, state.LockNodeId); + sysLocks.BreakSetLocks(); } auto locks = sysLocks.ApplyLocks(); @@ -1684,7 +1711,7 @@ public: << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery); TDataShardLocksDb locksDb(*Self, txc); - TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb); + TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb); Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self)); if (Reader->Read(txc, ctx)) { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index f9506c55f60..dd138f7de82 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -200,7 +200,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } } - self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells()); + self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); } } diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 060c9e79beb..054bedfc2b8 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -154,7 +154,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( continue; } - self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells()); + self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion); } diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index e85c6f81b94..50662025ef1 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -137,17 +137,23 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable } void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const { - Shard->SysLocksTable().SetLock(tableId, range, LockTxId, LockNodeId); + if (LockTxId) { + Shard->SysLocksTable().SetLock(tableId, range); + } Shard->SetTableAccessTime(tableId, Now); } void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const { - Shard->SysLocksTable().SetLock(tableId, key, LockTxId, LockNodeId); + if (LockTxId) { + Shard->SysLocksTable().SetLock(tableId, key); + } Shard->SetTableAccessTime(tableId, Now); } void TKqpDatashardComputeContext::BreakSetLocks() const { - Shard->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId); + if (LockTxId) { + Shard->SysLocksTable().BreakSetLocks(); + } } void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId, ui32 lockNodeId) { diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index cffc2a03409..686a8c63265 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -779,7 +779,11 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { Locker.ForceBreakLock(Update->LockTxId); Locker.SaveBrokenPersistentLocks(Db); } else { - lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId); + if (Update->Lock) { + lock = std::move(Update->Lock); + } else { + lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId); + } if (!lock) { counter = TLock::ErrorTooMuch; } else if (lock->IsBroken()) { @@ -933,41 +937,38 @@ void TSysLocks::CommitLock(const TArrayRef<const TCell>& key) { } } -void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { +void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key) { + Y_VERIFY(Update && Update->LockTxId); Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) return; - if (lockTxId) { - Y_VERIFY(Update); - Update->AddPointLock(Locker.MakePoint(tableId, key), lockTxId, lockNodeId); - } + Update->AddPointLock(Locker.MakePoint(tableId, key)); } -void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId) { +void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range) { if (range.Point) { // if range is point replace it with a point lock - SetLock(tableId, range.From, lockTxId, lockNodeId); + SetLock(tableId, range.From); return; } + Y_VERIFY(Update && Update->LockTxId); Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) return; - if (lockTxId) { - Y_VERIFY(Update); - Update->AddRangeLock(Locker.MakeRange(tableId, range), lockTxId, lockNodeId); - } + Update->AddRangeLock(Locker.MakeRange(tableId, range)); } -void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { +void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key) { + Y_VERIFY(Update && Update->LockTxId); Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) return; if (auto* table = Locker.FindTablePtr(tableId)) { - Update->AddWriteLock(table, lockTxId, lockNodeId); - AddWriteConflict(tableId, key, lockTxId, lockNodeId); + Update->AddWriteLock(table); + AddWriteConflict(tableId, key); } } @@ -977,7 +978,7 @@ void TSysLocks::BreakLock(ui64 lockId) { } } -void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) { +void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef<const TCell>& key) { Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); if (auto* table = Locker.FindTablePtr(tableId)) { @@ -994,29 +995,28 @@ void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& } } -void TSysLocks::AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) { - Y_UNUSED(lockNodeId); +void TSysLocks::AddReadConflict(ui64 conflictId) { + Y_VERIFY(Update && Update->LockTxId); - if (conflictId != lockTxId) { + if (conflictId != Update->LockTxId) { if (auto* lock = Locker.FindLockPtr(conflictId)) { Update->AddReadConflictLock(lock); } } } -void TSysLocks::AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) { - Y_UNUSED(lockNodeId); +void TSysLocks::AddWriteConflict(ui64 conflictId) { + Y_VERIFY(Update && Update->LockTxId); - if (conflictId != lockTxId) { + if (conflictId != Update->LockTxId) { if (auto* lock = Locker.FindLockPtr(conflictId)) { Update->AddWriteConflictLock(lock); } } } -void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { - Y_UNUSED(lockTxId); - Y_UNUSED(lockNodeId); +void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key) { + Y_VERIFY(Update && Update->LockTxId); if (auto* table = Locker.FindTablePtr(tableId)) { if (table->HasRangeLocks()) { @@ -1035,6 +1035,7 @@ void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const } void TSysLocks::BreakAllLocks(const TTableId& tableId) { + Y_VERIFY(Update); Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); if (!Self->IsUserTable(tableId)) return; @@ -1046,11 +1047,10 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) { } } -void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) { - Y_VERIFY(Update); +void TSysLocks::BreakSetLocks() { + Y_VERIFY(Update && Update->LockTxId); - if (lockTxId) - Update->BreakSetLocks(lockTxId, lockNodeId); + Update->BreakSetLocks(); } bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const { @@ -1059,8 +1059,10 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const { return ok && (Self->TabletID() == tabletId); } -bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const { - if (Update && Update->LockTxId == lockId && Update->WriteTables) { +bool TSysLocks::HasCurrentWriteLock(const TTableId& tableId) const { + Y_VERIFY(Update && Update->LockTxId); + + if (Update->WriteTables) { if (auto* table = Locker.FindTablePtr(tableId.PathId)) { if (table->IsInList<TTableLocksWriteListTag>()) { return true; @@ -1068,7 +1070,7 @@ bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const { } } - if (auto* lock = Locker.FindLockPtr(lockId)) { + if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) { if (lock->WriteTables.contains(tableId.PathId)) { return true; } @@ -1077,6 +1079,22 @@ bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const { return false; } +bool TSysLocks::HasCurrentWriteLocks() const { + Y_VERIFY(Update && Update->LockTxId); + + if (Update->WriteTables) { + return true; + } + + if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) { + if (lock->IsWriteLock()) { + return true; + } + } + + return false; +} + bool TSysLocks::HasWriteLocks(const TTableId& tableId) const { if (Update && Update->WriteTables) { return true; @@ -1091,14 +1109,31 @@ bool TSysLocks::HasWriteLocks(const TTableId& tableId) const { return false; } -bool TSysLocks::MayAddLock(ui64 lockId) const { - if (auto* lock = Locker.FindLockPtr(lockId)) { - // We may expand the lock unless it's broken - return !lock->IsBroken(); +EEnsureCurrentLock TSysLocks::EnsureCurrentLock() { + Y_VERIFY(Update && Update->LockTxId); + Y_VERIFY(Db, "EnsureCurrentLock needs a valid locks database"); + + if (auto* lock = Locker.FindLockPtr(Update->LockTxId)) { + // We cannot expand a broken lock + if (lock->IsBroken()) { + return EEnsureCurrentLock::Broken; + } + + Update->Lock = lock; + + return EEnsureCurrentLock::Success; + } + + if (!Db->MayAddLock(Update->LockTxId)) { + return EEnsureCurrentLock::Abort; + } + + Update->Lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId); + if (!Update->Lock) { + return EEnsureCurrentLock::TooMany; } - Y_VERIFY(Db, "MayAddLock needs a valid locks database"); - return Db->MayAddLock(lockId); + return EEnsureCurrentLock::Success; } TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const { diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index d33a9777567..5acb2690e07 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -259,6 +259,14 @@ public: TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); ~TLockInfo(); + bool Empty() const { + return !( + IsPersistent() || + !ReadTables.empty() || + !WriteTables.empty() || + IsBroken()); + } + template<class TTag> bool IsInList() const { using TItem = TIntrusiveListItem<TLockInfo, TTag>; @@ -620,22 +628,19 @@ struct TLocksUpdate { return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks); } - void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) { - Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); + void AddRangeLock(const TRangeKey& range) { ReadTables.PushBack(range.Table.Get()); AffectedTables.PushBack(range.Table.Get()); RangeLocks.push_back(range); } - void AddPointLock(const TPointKey& key, ui64 lockId, ui32 lockNodeId) { - Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); + void AddPointLock(const TPointKey& key) { ReadTables.PushBack(key.Table.Get()); AffectedTables.PushBack(key.Table.Get()); PointLocks.push_back(key); } - void AddWriteLock(TTableLocks* table, ui64 lockId, ui32 lockNodeId) { - Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); + void AddWriteLock(TTableLocks* table) { WriteTables.PushBack(table); AffectedTables.PushBack(table); } @@ -668,8 +673,7 @@ struct TLocksUpdate { EraseLocks.PushBack(lock); } - void BreakSetLocks(ui64 lockId, ui32 lockNodeId) { - Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); + void BreakSetLocks() { BreakOwn = true; } }; @@ -678,6 +682,22 @@ struct TLocksCache { THashMap<ui64, TSysTables::TLocksTable::TLock> Locks; }; +enum class EEnsureCurrentLock { + // New lock was created or an existing unbroken lock was found. + Success, + // Lock was already broken, this is not an error for some operations, e.g. + // readonly snapshot operations may usually continue until they write. + Broken, + // Some constraint prevents adding new lock, e.g. there are too many locks + // or not enough memory. This is usually similar to Broken. + TooMany, + // Operation must abort due to temporal violation. This may happen when + // transaction was already marked committed or aborted in the transaction + // map, but not yet fully compacted. New reads and especially writes may + // cause inconsistencies or data corruption and cannot be performed. + Abort, +}; + /// /sys/locks table logic class TSysLocks { public: @@ -690,8 +710,21 @@ public: , Locker(self) {} - void SetTxUpdater(TLocksUpdate* up) { - Update = up; + void SetupUpdate(TLocksUpdate* update, ILocksDb* db = nullptr) noexcept { + Y_VERIFY(!Update, "Cannot setup a recursive update"); + Y_VERIFY(update, "Cannot setup a nullptr update"); + Update = update; + Db = db; + } + + void ResetUpdate() noexcept { + if (Y_LIKELY(Update)) { + if (Update->Lock && Update->Lock->Empty()) { + Locker.RemoveLock(Update->LockTxId, nullptr); + } + Update = nullptr; + Db = nullptr; + } } void SetAccessLog(TLocksCache* log) { @@ -702,10 +735,6 @@ public: Cache = cache; } - void SetDb(ILocksDb* db) { - Db = db; - } - ui64 CurrentLockTxId() const { Y_VERIFY(Update); return Update->LockTxId; @@ -724,20 +753,33 @@ public: TLock GetLock(const TArrayRef<const TCell>& syslockKey) const; void EraseLock(const TArrayRef<const TCell>& syslockKey); void CommitLock(const TArrayRef<const TCell>& syslockKey); - void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); - void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId); - void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); + void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key); + void SetLock(const TTableId& tableId, const TTableRange& range); + void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key); void BreakLock(ui64 lockId); - void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key); - void AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId); - void AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId); - void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); + void BreakLocks(const TTableId& tableId, const TArrayRef<const TCell>& key); + void AddReadConflict(ui64 conflictId); + void AddWriteConflict(ui64 conflictId); + void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key); void BreakAllLocks(const TTableId& tableId); - void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId); + void BreakSetLocks(); bool IsMyKey(const TArrayRef<const TCell>& key) const; - bool HasWriteLock(ui64 lockId, const TTableId& tableId) const; + bool HasCurrentWriteLock(const TTableId& tableId) const; + bool HasCurrentWriteLocks() const; bool HasWriteLocks(const TTableId& tableId) const; - bool MayAddLock(ui64 lockId) const; + + /** + * Ensures current update has a valid lock pointer + * + * Prerequisites: TSetupSysLocks with LockId and LocksDb is active, and + * operation is planning to set read or write locks. + * + * Returns Success when a new lock is allocated or an existing (unbroken) + * lock is found. Returns Broken when a lock is likely to fail, e.g. due + * to memory or other constraints. Returns Abort when operation must abort + * early, e.g. because the given LockId cannot be reused. + */ + EEnsureCurrentLock EnsureCurrentLock(); ui64 LocksCount() const { return Locker.LocksCount(); } ui64 BrokenLocksCount() const { return Locker.BrokenLocksCount(); } diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp index e7724d352f0..8f15575378a 100644 --- a/ydb/core/tx/datashard/datashard_ut_locks.cpp +++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp @@ -159,21 +159,21 @@ namespace NTest { template <typename T> void SetLock(const TPointKey<T>& key) { - Locks.SetLock(TableId, key.GetRow(), LockId(), 0); + Locks.SetLock(TableId, key.GetRow()); } template <typename T> void SetLock(const TRangeKey<T>& range) { - Locks.SetLock(TableId, range.GetRowsRange(), LockId(), 0); + Locks.SetLock(TableId, range.GetRowsRange()); } template <typename T> - void BreakLock(const TPointKey<T>& key) { - Locks.BreakLock(TableId, key.GetRow()); + void BreakLocks(const TPointKey<T>& key) { + Locks.BreakLocks(TableId, key.GetRow()); } void BreakSetLocks() { - Locks.BreakSetLocks(LockId(), 0); + Locks.BreakSetLocks(); } // @@ -191,17 +191,17 @@ namespace NTest { void StartTx(TLocksUpdate& update) { update.LockTxId = 0; - Locks.SetTxUpdater(&update); + Locks.SetupUpdate(&update); } void StartTx(ui64 lockTxId, TLocksUpdate& update) { update.LockTxId = lockTxId; - Locks.SetTxUpdater(&update); + Locks.SetupUpdate(&update); } TVector<TSysLocks::TLock> ApplyTxLocks() { auto locks = Locks.ApplyLocks(); - Locks.SetTxUpdater(nullptr); + Locks.ResetUpdate(); return locks; } @@ -233,7 +233,7 @@ namespace NTest { template <typename T> void Update(const TVector<T>& updates) { for (auto& value : updates) { - BreakLock(TLockTester::TPointKey<T>(value)); + BreakLocks(TLockTester::TPointKey<T>(value)); } } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 0cd1e6aec2f..b75720bb163 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -2603,6 +2603,98 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { observer.InjectClearTasks = false; observer.InjectLocks.reset(); } + + Y_UNIT_TEST(LockedWriteReuseAfterCommit) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Write uncommitted changes to key 3 with tx 123 + // The lock for tx 123 was committed and removed, and cannot be reused + // until all changes are fully compacted. Otherwise new changes will + // appear as immediately committed in the past. + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 31) + )")), + "ERROR: ABORTED"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 4f9d2066994..29a8a625806 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -121,6 +121,35 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, return EExecutionStatus::Restart; } engine->SetMemoryLimit(txc.GetMemoryLimit() - tx->GetDataTx()->GetTxSize()); + + if (guardLocks.LockTxId) { + switch (DataShard.SysLocksTable().EnsureCurrentLock()) { + case EEnsureCurrentLock::Success: + // Lock is valid, we may continue with reads and side-effects + break; + + case EEnsureCurrentLock::Broken: + // Lock is valid, but broken, we could abort early in some + // cases, but it doesn't affect correctness. + break; + + case EEnsureCurrentLock::TooMany: + // Lock cannot be created, it's not necessarily a problem + // for read-only transactions, for non-readonly we need to + // abort; + if (op->IsReadOnly()) { + break; + } + + [[fallthrough]]; + + case EEnsureCurrentLock::Abort: + // Lock cannot be created and we must abort + op->SetAbortedFlag(); + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + return EExecutionStatus::Executed; + } + } } try { @@ -260,8 +289,8 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, KqpFillTxStats(DataShard, counters, *result); } - if (counters.InvisibleRowSkips) { - DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId()); + if (counters.InvisibleRowSkips && op->LockTxId()) { + DataShard.SysLocksTable().BreakSetLocks(); } AddLocksToResult(op, ctx); diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index f2c6bc2f808..0059180e3ae 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -140,7 +140,7 @@ public: continue; } - DataShard.SysLocksTable().BreakLock(fullTableId, keyCells.GetCells()); + DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion); } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 385189adafe..81a30fb91fd 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -121,7 +121,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Executed; } - try { auto& kqpTx = dataTx->GetKqpTransaction(); auto& tasksRunner = dataTx->GetKqpTasksRunner(); @@ -135,6 +134,38 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Restart; } + if (guardLocks.LockTxId) { + switch (DataShard.SysLocksTable().EnsureCurrentLock()) { + case EEnsureCurrentLock::Success: + // Lock is valid, we may continue with reads and side-effects + break; + + case EEnsureCurrentLock::Broken: + // Lock is valid, but broken, we could abort early in some + // cases, but it doesn't affect correctness. + break; + + case EEnsureCurrentLock::TooMany: + // Lock cannot be created, it's not necessarily a problem + // for read-only transactions, for non-readonly we need to + // abort; + if (op->IsReadOnly()) { + break; + } + + [[fallthrough]]; + + case EEnsureCurrentLock::Abort: + // Lock cannot be created and we must abort + LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId + << " aborting because it cannot acquire locks"); + + op->SetAbortedFlag(); + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + return EExecutionStatus::Executed; + } + } + if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) { KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); DataShard.SysLocksTable().ApplyLocks(); @@ -229,8 +260,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio op->Result().Swap(result); op->SetKqpAttachedRSFlag(); - if (dataTx->GetCounters().InvisibleRowSkips) { - DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId()); + if (dataTx->GetCounters().InvisibleRowSkips && op->LockTxId()) { + DataShard.SysLocksTable().BreakSetLocks(); } AddLocksToResult(op, ctx); diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h index f10426c2a83..5a4c7eb65c3 100644 --- a/ydb/core/tx/datashard/setup_sys_locks.h +++ b/ydb/core/tx/datashard/setup_sys_locks.h @@ -17,8 +17,7 @@ struct TSetupSysLocks CheckVersion = TRowVersion::Max(); BreakVersion = TRowVersion::Min(); - SysLocksTable.SetTxUpdater(this); - SysLocksTable.SetDb(db); + SysLocksTable.SetupUpdate(this, db); } TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, TDataShard& self, ILocksDb* db) @@ -29,8 +28,7 @@ struct TSetupSysLocks CheckVersion = TRowVersion::Max(); BreakVersion = TRowVersion::Min(); - SysLocksTable.SetTxUpdater(this); - SysLocksTable.SetDb(db); + SysLocksTable.SetupUpdate(this, db); } TSetupSysLocks(TOperation::TPtr op, @@ -55,19 +53,17 @@ struct TSetupSysLocks BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min(); } - SysLocksTable.SetTxUpdater(this); if (!op->LocksCache().Locks.empty()) SysLocksTable.SetCache(&op->LocksCache()); else SysLocksTable.SetAccessLog(&op->LocksAccessLog()); - SysLocksTable.SetDb(db); + SysLocksTable.SetupUpdate(this, db); } ~TSetupSysLocks() { - SysLocksTable.SetTxUpdater(nullptr); + SysLocksTable.ResetUpdate(); SysLocksTable.SetCache(nullptr); SysLocksTable.SetAccessLog(nullptr); - SysLocksTable.SetDb(nullptr); } }; |