diff options
author | snaury <snaury@ydb.tech> | 2022-10-05 12:22:14 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-10-05 12:22:14 +0300 |
commit | 0ce4feb0aeffff80b8f48ae565835263a8bbd7fa (patch) | |
tree | a4acf92e385fd79eabe610c2f352f39373cced6c | |
parent | 042eb79d581762c5c3c023887e62fc7f9578ac65 (diff) | |
download | ydb-0ce4feb0aeffff80b8f48ae565835263a8bbd7fa.tar.gz |
Ordering guarantees for distributed commits with write locks
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.cpp | 54 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 61 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 66 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 573 | ||||
-rw-r--r-- | ydb/core/tx/datashard/direct_tx_unit.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/setup_sys_locks.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp | 8 |
15 files changed, 789 insertions, 74 deletions
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index 0fc88d43d05..ee6bf31f4d2 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -619,7 +619,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // First pass, gather all reads/writes expanded with locks, add lock based dependencies bool haveReads = false; bool haveWrites = false; - bool haveWriteLock = false; + bool commitWriteLock = false; if (haveKeys) { size_t keysCount = 0; const auto& keysInfo = op->GetKeysInfo(); @@ -666,6 +666,41 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // Reading a lock means checking it for validity, i.e. "reading" those predicted keys if (!vk.IsWrite) { + auto lock = Parent.Self->SysLocksTable().GetRawLock(lockTxId, readVersion); + + if (lock) { + lock->SetLastOpId(op->GetTxId()); + if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) { + // This lock was cached before, and since we know + // it's persistent, we know it was also frozen + // during that lock caching. Restore the frozen + // flag for this lock. + lock->SetFrozen(); + } + } + + if (lock && lock->IsWriteLock()) { + commitWriteLock = true; + } + + if (lock && !op->IsImmediate()) { + // We must be careful not to reorder operations that we + // know break each other on commit. These dependencies + // are only needed between planned operations, and + // always point one way (towards older operations), so + // there are no deadlocks. Immediate operations will + // detect breaking attempts at runtime and may decide + // to add new dependencies dynamically, depending on + // the real order between operations. + lock->ForAllConflicts([&](auto* conflictLock) { + if (auto conflictOp = Parent.FindLastLockOp(conflictLock->GetLockId())) { + if (!conflictOp->IsImmediate()) { + op->AddDependency(conflictOp); + } + } + }); + } + if (auto it = locksCache.Locks.find(lockTxId); it != locksCache.Locks.end()) { // This transaction uses locks cache, so lock check // outcome was persisted and restored. Unfortunately @@ -684,7 +719,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera Parent.TmpRead.insert(Parent.TmpRead.end(), it->second.Keys.begin(), it->second.Keys.end()); } } - } else if (auto lock = Parent.Self->SysLocksTable().GetRawLock(lockTxId, readVersion)) { + } else if (lock) { Y_ASSERT(!lock->IsBroken(readVersion)); haveReads = true; if (!tooManyKeys && (keysCount += (lock->NumPoints() + lock->NumRanges())) > MAX_REORDER_TX_KEYS) { @@ -702,14 +737,21 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera } } } - if (lock->IsWriteLock()) { - haveWriteLock = true; - } } } } } + // Distributed commits of some unknown keys are complicated, and mean + // there are almost certainly readsets involved and it's difficult to + // make it atomic, so we currently make them global writers to handle + // out-of-order execution issues. Immediate operations are atomic by + // their construction and will find conflicts dynamically. We may + // want to do something similar with distributed commits as well. + if (commitWriteLock && !op->IsImmediate()) { + isGlobalWriter = true; + } + if (tooManyKeys) { if (haveReads) { isGlobalReader = true; @@ -735,7 +777,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera if (op->IsMvccSnapshotRead()) { snapshot = op->GetMvccSnapshot(); snapshotRepeatable = op->IsMvccSnapshotRepeatable(); - } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !haveWriteLock)) { + } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !commitWriteLock)) { snapshot = readVersion; op->SetMvccSnapshot(snapshot, /* repeatable */ false); } diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h index 22f49b2badb..4491ac6a285 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.h +++ b/ydb/core/tx/datashard/datashard_dep_tracker.h @@ -105,6 +105,14 @@ public: GetTrackingLogic().RemoveOperation(op); } + TOperation::TPtr FindLastLockOp(ui64 lockTxId) const { + auto it = LastLockOps.find(lockTxId); + if (it != LastLockOps.end()) { + return it->second; + } + return nullptr; + } + private: void ClearTmpRead() noexcept; void ClearTmpWrite() noexcept; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 01c51025612..fa574570ae7 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -465,7 +465,12 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc auto lockKey = MakeLockKey(lock); if (sysLocks.IsMyKey(lockKey)) { auto point = TTableRange(lockKey, true, {}, true, /* point */ true); - engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType); + if (NeedValidateLocks(locks.GetOp())) { + engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType); + } + if (NeedEraseLocks(locks.GetOp())) { + engineBay.AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true); + } } } } diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 686a8c63265..999ddd5973e 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -363,28 +363,6 @@ void TTableLocks::RemoveWriteLock(TLockInfo* lock) { WriteLocks.erase(lock); } -bool TTableLocks::BreakShardLocks(const TRowVersion& at) { - bool broken = false; - for (TLockInfo* lock : ShardLocks) { - lock->SetBroken(at); - broken = true; - } - return broken; -} - -bool TTableLocks::BreakAllLocks(const TRowVersion& at) { - bool broken = false; - for (TLockInfo* lock : ShardLocks) { - lock->SetBroken(at); - broken = true; - } - Ranges.EachRange([&](const TRangeTreeBase::TRange&, TLockInfo* lock) { - lock->SetBroken(at); - broken = true; - }); - return broken; -} - // TLockLocker void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) { @@ -450,22 +428,6 @@ void TLockLocker::BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& l RemoveBrokenRanges(); } -void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at) { - for (auto& table : tables) { - table.BreakShardLocks(at); - } - - RemoveBrokenRanges(); -} - -void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at) { - for (auto& table : tables) { - table.BreakAllLocks(at); - } - - RemoveBrokenRanges(); -} - void TLockLocker::RemoveBrokenRanges() { for (ui64 lockId : CleanupPending) { auto it = Locks.find(lockId); @@ -679,7 +641,10 @@ void TLockLocker::ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at) } void TLockLocker::RemoveSubscribedLock(ui64 lockId, ILocksDb* db) { - RemoveLock(lockId, db); + auto it = Locks.find(lockId); + if (it != Locks.end() && !it->second->IsFrozen()) { + RemoveLock(lockId, db); + } } void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) { @@ -704,7 +669,7 @@ TLocksUpdate::~TLocksUpdate() { cleanList(AffectedTables); cleanList(BreakLocks); cleanList(BreakShardLocks); - cleanList(BreakAllLocks); + cleanList(BreakRangeLocks); cleanList(ReadConflictLocks); cleanList(WriteConflictLocks); cleanList(WriteConflictShardLocks); @@ -727,18 +692,11 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { // TODO: move this somewhere earlier, like the start of a new update guard Locker.RemoveBrokenRanges(); + Update->FlattenBreakLocks(); if (Update->BreakLocks) { Locker.BreakLocks(Update->BreakLocks, breakVersion); } - if (Update->BreakShardLocks) { - Locker.BreakLocks(Update->BreakShardLocks, breakVersion); - } - - if (Update->BreakAllLocks) { - Locker.BreakLocks(Update->BreakAllLocks, breakVersion); - } - Locker.SaveBrokenPersistentLocks(Db); // Merge shard lock conflicts into write conflicts, we do this once as an optimization @@ -1041,8 +999,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) { return; if (auto* table = Locker.FindTablePtr(tableId)) { - if (table->HasRangeLocks() || table->HasShardLocks()) { - Update->AddBreakAllLocks(table); + if (table->HasShardLocks()) { + Update->AddBreakShardLocks(table); + } + if (table->HasRangeLocks()) { + Update->AddBreakRangeLocks(table); } } } diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index d5c910fb4eb..e9e844a9242 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -316,6 +316,19 @@ public: void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags); void RestorePersistentConflict(TLockInfo* otherLock); + template<class TCallback> + void ForAllConflicts(TCallback&& callback) { + for (auto& pr : ConflictLocks) { + callback(pr.first); + } + } + + ui64 GetLastOpId() const { return LastOpId; } + void SetLastOpId(ui64 opId) { LastOpId = opId; } + + bool IsFrozen() const { return Frozen; } + void SetFrozen() { Frozen = true; } + private: void MakeShardLock(); bool AddShardLock(const TPathId& pathId); @@ -355,13 +368,16 @@ private: // A set of locks we must break on commit THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks; TVector<TPersistentRange> PersistentRanges; + + ui64 LastOpId = 0; + bool Frozen = false; }; struct TTableLocksReadListTag {}; struct TTableLocksWriteListTag {}; struct TTableLocksAffectedListTag {}; struct TTableLocksBreakShardListTag {}; -struct TTableLocksBreakAllListTag {}; +struct TTableLocksBreakRangeListTag {}; struct TTableLocksWriteConflictShardListTag {}; /// @@ -371,7 +387,7 @@ class TTableLocks , public TIntrusiveListItem<TTableLocks, TTableLocksWriteListTag> , public TIntrusiveListItem<TTableLocks, TTableLocksAffectedListTag> , public TIntrusiveListItem<TTableLocks, TTableLocksBreakShardListTag> - , public TIntrusiveListItem<TTableLocks, TTableLocksBreakAllListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksBreakRangeListTag> , public TIntrusiveListItem<TTableLocks, TTableLocksWriteConflictShardListTag> { friend class TSysLocks; @@ -401,8 +417,6 @@ public: void RemoveShardLock(TLockInfo* lock); void RemoveRangeLock(TLockInfo* lock); void RemoveWriteLock(TLockInfo* lock); - bool BreakShardLocks(const TRowVersion& at); - bool BreakAllLocks(const TRowVersion& at); ui64 NumKeyColumns() const { return KeyColumnTypes.size(); @@ -431,6 +445,27 @@ public: WriteLocks.clear(); } + template<class TCallback> + void ForEachRangeLock(TCallback&& callback) { + Ranges.EachRange([&callback](const TRangeTreeBase::TRange&, TLockInfo* lock) { + callback(lock); + }); + } + + template<class TCallback> + void ForEachShardLock(TCallback&& callback) { + for (TLockInfo* lock : ShardLocks) { + callback(lock); + } + } + + template<class TCallback> + void ForEachWriteLock(TCallback&& callback) { + for (TLockInfo* lock : WriteLocks) { + callback(lock); + } + } + private: const TPathId TableId; TVector<NScheme::TTypeInfo> KeyColumnTypes; @@ -480,8 +515,6 @@ public: ui64 BrokenLocksCount() const { return BrokenLocksCount_; } void BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at); - void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at); - void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at); void ForceBreakLock(ui64 lockId); void RemoveLock(ui64 lockTxId, ILocksDb* db); @@ -609,7 +642,7 @@ struct TLocksUpdate { TIntrusiveList<TLockInfo, TLockInfoBreakListTag> BreakLocks; TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag> BreakShardLocks; - TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag> BreakAllLocks; + TIntrusiveList<TTableLocks, TTableLocksBreakRangeListTag> BreakRangeLocks; TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks; TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks; @@ -653,8 +686,23 @@ struct TLocksUpdate { BreakShardLocks.PushBack(table); } - void AddBreakAllLocks(TTableLocks* table) { - BreakAllLocks.PushBack(table); + void AddBreakRangeLocks(TTableLocks* table) { + BreakRangeLocks.PushBack(table); + } + + void FlattenBreakLocks() { + while (BreakShardLocks) { + TTableLocks* table = BreakShardLocks.PopFront(); + table->ForEachShardLock([this](TLockInfo* lock) { + BreakLocks.PushBack(lock); + }); + } + while (BreakRangeLocks) { + TTableLocks* table = BreakRangeLocks.PopFront(); + table->ForEachRangeLock([this](TLockInfo* lock) { + BreakLocks.PushBack(lock); + }); + } } void AddReadConflictLock(TLockInfo* lock) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 7a8d128190a..4467290cba0 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1904,4 +1904,25 @@ bool TPipeline::MarkPlannedLogicallyIncompleteUpTo(const TRowVersion& version, T return hadWrites; } +bool TPipeline::AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks) { + bool addedDependencies = false; + + guardLocks.FlattenBreakLocks(); + for (auto& lock : guardLocks.BreakLocks) { + // We cannot break frozen locks + // Find their corresponding operations and reschedule + if (lock.IsFrozen()) { + if (auto conflictOp = FindOp(lock.GetLastOpId())) { + if (conflictOp != op) { + // FIXME: make sure this op is not complete + op->AddDependency(conflictOp); + addedDependencies = true; + } + } + } + } + + return addedDependencies; +} + }} diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 96bcbdfe52e..b2199949f7d 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -366,6 +366,13 @@ public: */ bool MarkPlannedLogicallyIncompleteUpTo(const TRowVersion& version, TTransactionContext& txc); + /** + * Adds new runtime dependencies to op based on its buffered lock updates. + * + * Returns true when new dependencies were added and op must be rescheduled. + */ + bool AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks); + private: struct TStoredExecutionProfile { TBasicOpInfo OpInfo; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index d63d3774c73..d86aeb3b27f 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1491,6 +1491,10 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { struct TInjectLocks { NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit; TVector<TLockInfo> Locks; + + void AddLocks(const TVector<TLockInfo>& locks) { + Locks.insert(Locks.end(), locks.begin(), locks.end()); + } }; class TInjectLockSnapshotObserver { @@ -1611,10 +1615,25 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } break; } + case TEvTxProcessing::TEvReadSet::EventType: { + if (BlockReadSets) { + Cerr << "... blocked TEvReadSet" << Endl; + BlockedReadSets.push_back(THolder(ev.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } } return PrevObserver(Runtime, ev); } + void UnblockReadSets() { + BlockReadSets = false; + for (auto& ev : BlockedReadSets) { + Runtime.Send(ev.Release(), 0, true); + } + } + private: TTestActorRuntime& Runtime; TTestActorRuntime::TEventObserver PrevObserver; @@ -1625,6 +1644,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { TVector<TLockInfo> LastLocks; std::optional<TInjectLocks> InjectLocks; bool InjectClearTasks = false; + bool BlockReadSets = false; + TVector<THolder<IEventHandle>> BlockedReadSets; }; Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) { @@ -2695,6 +2716,558 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { auto locks2 = observer.LastLocks; observer.Inject = {}; } + + Y_UNIT_TEST(LockedWriteDistributedCommitSuccess) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 20 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace(); + observer.InjectLocks->AddLocks(locks1); + observer.InjectLocks->AddLocks(locks2); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0); + UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0); + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify changes are now visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-2` + WHERE key >= 10 AND key <= 30 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + } + + Y_UNIT_TEST(LockedWriteDistributedCommitAborted) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 20 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Write to key 20, it will break tx 123 + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 22) + )")), + "<empty>"); + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace(); + observer.InjectLocks->AddLocks(locks1); + observer.InjectLocks->AddLocks(locks2); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0); + UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0); + )")), + "ERROR: ABORTED"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify changes are not visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-2` + WHERE key >= 10 AND key <= 30 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 22 } } } " + "} Struct { Bool: false }"); + } + + Y_UNIT_TEST(LockedWriteDistributedCommitFreeze) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 20 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Commit changes in tx 123 + observer.BlockReadSets = true; + observer.InjectClearTasks = true; + observer.InjectLocks.emplace(); + observer.InjectLocks->AddLocks(locks1); + observer.InjectLocks->AddLocks(locks2); + auto commitSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, commitSender, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0); + UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0); + )"))); + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT(!observer.BlockedReadSets.empty()); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + auto writeSender = runtime.AllocateEdgeActor(); + SendRequest(runtime, writeSender, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22) + )"))); + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Verify changes are not visible yet + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + observer.UnblockReadSets(); + + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(writeSender); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + } + + Y_UNIT_TEST(LockedWriteDistributedCommitCrossConflict) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 20 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 3 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 22) + )")), + "<empty>"); + auto locks3 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 30 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 22) + )")), + "<empty>"); + auto locks4 = observer.LastLocks; + observer.Inject = {}; + + // Commit changes in tx 123 (we expect locks to be ready for sending) + observer.BlockReadSets = true; + //observer.InjectClearTasks = true; + observer.InjectLocks.emplace(); + observer.InjectLocks->AddLocks(locks1); + observer.InjectLocks->AddLocks(locks2); + auto commitSender1 = runtime.AllocateEdgeActor(); + SendRequest(runtime, commitSender1, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 21); + UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 21); + )"))); + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT(!observer.BlockedReadSets.empty()); + //observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Commit changes in tx 234 (we expect it to be blocked and broken by 123) + observer.BlockReadSets = true; + //observer.InjectClearTasks = true; + observer.InjectLocks.emplace(); + observer.InjectLocks->AddLocks(locks3); + observer.InjectLocks->AddLocks(locks4); + auto commitSender2 = runtime.AllocateEdgeActor(); + SendRequest(runtime, commitSender2, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 22); + )"))); + runtime.SimulateSleep(TDuration::Seconds(1)); + UNIT_ASSERT(!observer.BlockedReadSets.empty()); + //observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify changes are not visible yet + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + observer.UnblockReadSets(); + + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender1); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender2); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED); + } + + // Verify only changes from commit 123 are visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-2` + WHERE key >= 10 AND key <= 30 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 21 } } } " + "List { Struct { Optional { Uint32: 30 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index a9d840cace0..1d0ea8ad17d 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -40,6 +40,11 @@ public: return EExecutionStatus::Restart; } + if (Pipeline.AddLockDependencies(op, guardLocks)) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } + op->ChangeRecords() = std::move(tx->GetCollectedChanges()); DataShard.SysLocksTable().ApplyLocks(); diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp index e1949849105..cc31f453a65 100644 --- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp @@ -46,6 +46,11 @@ public: DataShard.SysLocksTable().BreakAllLocks(fullTableId); txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion); + if (Pipeline.AddLockDependencies(op, guardLocks)) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); DataShard.SysLocksTable().ApplyLocks(); DataShard.SubscribeNewLocks(ctx); diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 29a8a625806..33a1e8caafe 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -24,8 +24,12 @@ public: private: void ExecuteDataTx(TOperation::TPtr op, - const TActorContext& ctx); + const TActorContext& ctx, + TSetupSysLocks& guardLocks); void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx); + +private: + class TRescheduleOpException : public yexception {}; }; TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard, @@ -154,7 +158,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, try { try { - ExecuteDataTx(op, ctx); + ExecuteDataTx(op, ctx, guardLocks); } catch (const TNotReadyTabletException&) { // We want to try pinning (actually precharging) all required pages // before restarting the transaction, to minimize future restarts. @@ -186,6 +190,14 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, tx->ReleaseTxData(txc, ctx); return EExecutionStatus::Restart; + } catch (const TRescheduleOpException&) { + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() + << " needs to reschedule " << *op << " for dependencies"); + + tx->ReleaseTxData(txc, ctx); + + txc.Reschedule(); + return EExecutionStatus::Restart; } DataShard.IncCounter(COUNTER_WAIT_EXECUTE_LATENCY_MS, waitExecuteLatency.MilliSeconds()); @@ -204,7 +216,9 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, - const TActorContext& ctx) { + const TActorContext& ctx, + TSetupSysLocks& guardLocks) +{ TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); IEngineFlat* engine = tx->GetDataTx()->GetEngine(); @@ -232,6 +246,11 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, } IEngineFlat::EResult engineResult = engine->Execute(); + + if (Pipeline.AddLockDependencies(op, guardLocks)) { + throw TRescheduleOpException(); + } + if (engineResult != IEngineFlat::EResult::Ok) { TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at " << DataShard.TabletID() << ": " << engine->GetErrors(); diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 6e68eabd848..d02c82aae7e 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -58,6 +58,11 @@ public: return EExecutionStatus::Restart; } + if (Pipeline.AddLockDependencies(op, guardLocks)) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } + if (changeCollector) { op->ChangeRecords() = std::move(changeCollector->GetCollected()); } @@ -78,6 +83,11 @@ public: Y_VERIFY(ok); } } + + if (Pipeline.AddLockDependencies(op, guardLocks)) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } } else { Y_FAIL_S("Invalid distributed erase tx: " << eraseTx->GetBody().ShortDebugString()); } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 81a30fb91fd..2a4c014a419 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -256,6 +256,14 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return OnTabletNotReady(*tx, *dataTx, txc, ctx); } + if (Pipeline.AddLockDependencies(op, guardLocks)) { + allocGuard.Release(); + dataTx->ResetCollectedChanges(); + tx->ReleaseTxData(txc, ctx); + txc.Reschedule(); + return EExecutionStatus::Restart; + } + Y_VERIFY(result); op->Result().Swap(result); op->SetKqpAttachedRSFlag(); diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h index 5a4c7eb65c3..503f6f2e3e9 100644 --- a/ydb/core/tx/datashard/setup_sys_locks.h +++ b/ydb/core/tx/datashard/setup_sys_locks.h @@ -14,9 +14,6 @@ struct TSetupSysLocks TSetupSysLocks(TDataShard& self, ILocksDb* db) : SysLocksTable(self.SysLocksTable()) { - CheckVersion = TRowVersion::Max(); - BreakVersion = TRowVersion::Min(); - SysLocksTable.SetupUpdate(this, db); } @@ -25,8 +22,6 @@ struct TSetupSysLocks { LockTxId = lockTxId; LockNodeId = lockNodeId; - CheckVersion = TRowVersion::Max(); - BreakVersion = TRowVersion::Min(); SysLocksTable.SetupUpdate(this, db); } diff --git a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp index a648b765fd2..9686c587f50 100644 --- a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp @@ -57,6 +57,14 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op, // N.B. we copy access log to locks cache, so that future lock access is repeatable tx->LocksCache().Locks = tx->LocksAccessLog().Locks; tx->DbStoreLocksAccessLog(&DataShard, txc, ctx); + // Freeze persistent locks that we have cached + for (auto& pr : tx->LocksCache().Locks) { + ui64 lockId = pr.first; + auto lock = DataShard.SysLocksTable().GetRawLock(lockId); + if (lock && lock->IsPersistent()) { + lock->SetFrozen(); + } + } tx->MarkLocksStored(); newArtifact = true; } |