aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-10-05 12:22:14 +0300
committersnaury <snaury@ydb.tech>2022-10-05 12:22:14 +0300
commit0ce4feb0aeffff80b8f48ae565835263a8bbd7fa (patch)
treea4acf92e385fd79eabe610c2f352f39373cced6c
parent042eb79d581762c5c3c023887e62fc7f9578ac65 (diff)
downloadydb-0ce4feb0aeffff80b8f48ae565835263a8bbd7fa.tar.gz
Ordering guarantees for distributed commits with write locks
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.cpp54
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.h8
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp61
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h66
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp573
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp25
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp10
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp8
-rw-r--r--ydb/core/tx/datashard/setup_sys_locks.h5
-rw-r--r--ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp8
15 files changed, 789 insertions, 74 deletions
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
index 0fc88d43d05..ee6bf31f4d2 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
@@ -619,7 +619,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// First pass, gather all reads/writes expanded with locks, add lock based dependencies
bool haveReads = false;
bool haveWrites = false;
- bool haveWriteLock = false;
+ bool commitWriteLock = false;
if (haveKeys) {
size_t keysCount = 0;
const auto& keysInfo = op->GetKeysInfo();
@@ -666,6 +666,41 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// Reading a lock means checking it for validity, i.e. "reading" those predicted keys
if (!vk.IsWrite) {
+ auto lock = Parent.Self->SysLocksTable().GetRawLock(lockTxId, readVersion);
+
+ if (lock) {
+ lock->SetLastOpId(op->GetTxId());
+ if (locksCache.Locks.contains(lockTxId) && lock->IsPersistent()) {
+ // This lock was cached before, and since we know
+ // it's persistent, we know it was also frozen
+ // during that lock caching. Restore the frozen
+ // flag for this lock.
+ lock->SetFrozen();
+ }
+ }
+
+ if (lock && lock->IsWriteLock()) {
+ commitWriteLock = true;
+ }
+
+ if (lock && !op->IsImmediate()) {
+ // We must be careful not to reorder operations that we
+ // know break each other on commit. These dependencies
+ // are only needed between planned operations, and
+ // always point one way (towards older operations), so
+ // there are no deadlocks. Immediate operations will
+ // detect breaking attempts at runtime and may decide
+ // to add new dependencies dynamically, depending on
+ // the real order between operations.
+ lock->ForAllConflicts([&](auto* conflictLock) {
+ if (auto conflictOp = Parent.FindLastLockOp(conflictLock->GetLockId())) {
+ if (!conflictOp->IsImmediate()) {
+ op->AddDependency(conflictOp);
+ }
+ }
+ });
+ }
+
if (auto it = locksCache.Locks.find(lockTxId); it != locksCache.Locks.end()) {
// This transaction uses locks cache, so lock check
// outcome was persisted and restored. Unfortunately
@@ -684,7 +719,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
Parent.TmpRead.insert(Parent.TmpRead.end(), it->second.Keys.begin(), it->second.Keys.end());
}
}
- } else if (auto lock = Parent.Self->SysLocksTable().GetRawLock(lockTxId, readVersion)) {
+ } else if (lock) {
Y_ASSERT(!lock->IsBroken(readVersion));
haveReads = true;
if (!tooManyKeys && (keysCount += (lock->NumPoints() + lock->NumRanges())) > MAX_REORDER_TX_KEYS) {
@@ -702,14 +737,21 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
}
}
}
- if (lock->IsWriteLock()) {
- haveWriteLock = true;
- }
}
}
}
}
+ // Distributed commits of some unknown keys are complicated, and mean
+ // there are almost certainly readsets involved and it's difficult to
+ // make it atomic, so we currently make them global writers to handle
+ // out-of-order execution issues. Immediate operations are atomic by
+ // their construction and will find conflicts dynamically. We may
+ // want to do something similar with distributed commits as well.
+ if (commitWriteLock && !op->IsImmediate()) {
+ isGlobalWriter = true;
+ }
+
if (tooManyKeys) {
if (haveReads) {
isGlobalReader = true;
@@ -735,7 +777,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
if (op->IsMvccSnapshotRead()) {
snapshot = op->GetMvccSnapshot();
snapshotRepeatable = op->IsMvccSnapshotRepeatable();
- } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !haveWriteLock)) {
+ } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !commitWriteLock)) {
snapshot = readVersion;
op->SetMvccSnapshot(snapshot, /* repeatable */ false);
}
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.h b/ydb/core/tx/datashard/datashard_dep_tracker.h
index 22f49b2badb..4491ac6a285 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.h
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.h
@@ -105,6 +105,14 @@ public:
GetTrackingLogic().RemoveOperation(op);
}
+ TOperation::TPtr FindLastLockOp(ui64 lockTxId) const {
+ auto it = LastLockOps.find(lockTxId);
+ if (it != LastLockOps.end()) {
+ return it->second;
+ }
+ return nullptr;
+ }
+
private:
void ClearTmpRead() noexcept;
void ClearTmpWrite() noexcept;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 01c51025612..fa574570ae7 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -465,7 +465,12 @@ void KqpSetTxLocksKeys(const NKikimrTxDataShard::TKqpLocks& locks, const TSysLoc
auto lockKey = MakeLockKey(lock);
if (sysLocks.IsMyKey(lockKey)) {
auto point = TTableRange(lockKey, true, {}, true, /* point */ true);
- engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType);
+ if (NeedValidateLocks(locks.GetOp())) {
+ engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType);
+ }
+ if (NeedEraseLocks(locks.GetOp())) {
+ engineBay.AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true);
+ }
}
}
}
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index 686a8c63265..999ddd5973e 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -363,28 +363,6 @@ void TTableLocks::RemoveWriteLock(TLockInfo* lock) {
WriteLocks.erase(lock);
}
-bool TTableLocks::BreakShardLocks(const TRowVersion& at) {
- bool broken = false;
- for (TLockInfo* lock : ShardLocks) {
- lock->SetBroken(at);
- broken = true;
- }
- return broken;
-}
-
-bool TTableLocks::BreakAllLocks(const TRowVersion& at) {
- bool broken = false;
- for (TLockInfo* lock : ShardLocks) {
- lock->SetBroken(at);
- broken = true;
- }
- Ranges.EachRange([&](const TRangeTreeBase::TRange&, TLockInfo* lock) {
- lock->SetBroken(at);
- broken = true;
- });
- return broken;
-}
-
// TLockLocker
void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) {
@@ -450,22 +428,6 @@ void TLockLocker::BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& l
RemoveBrokenRanges();
}
-void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at) {
- for (auto& table : tables) {
- table.BreakShardLocks(at);
- }
-
- RemoveBrokenRanges();
-}
-
-void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at) {
- for (auto& table : tables) {
- table.BreakAllLocks(at);
- }
-
- RemoveBrokenRanges();
-}
-
void TLockLocker::RemoveBrokenRanges() {
for (ui64 lockId : CleanupPending) {
auto it = Locks.find(lockId);
@@ -679,7 +641,10 @@ void TLockLocker::ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at)
}
void TLockLocker::RemoveSubscribedLock(ui64 lockId, ILocksDb* db) {
- RemoveLock(lockId, db);
+ auto it = Locks.find(lockId);
+ if (it != Locks.end() && !it->second->IsFrozen()) {
+ RemoveLock(lockId, db);
+ }
}
void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) {
@@ -704,7 +669,7 @@ TLocksUpdate::~TLocksUpdate() {
cleanList(AffectedTables);
cleanList(BreakLocks);
cleanList(BreakShardLocks);
- cleanList(BreakAllLocks);
+ cleanList(BreakRangeLocks);
cleanList(ReadConflictLocks);
cleanList(WriteConflictLocks);
cleanList(WriteConflictShardLocks);
@@ -727,18 +692,11 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
// TODO: move this somewhere earlier, like the start of a new update guard
Locker.RemoveBrokenRanges();
+ Update->FlattenBreakLocks();
if (Update->BreakLocks) {
Locker.BreakLocks(Update->BreakLocks, breakVersion);
}
- if (Update->BreakShardLocks) {
- Locker.BreakLocks(Update->BreakShardLocks, breakVersion);
- }
-
- if (Update->BreakAllLocks) {
- Locker.BreakLocks(Update->BreakAllLocks, breakVersion);
- }
-
Locker.SaveBrokenPersistentLocks(Db);
// Merge shard lock conflicts into write conflicts, we do this once as an optimization
@@ -1041,8 +999,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) {
return;
if (auto* table = Locker.FindTablePtr(tableId)) {
- if (table->HasRangeLocks() || table->HasShardLocks()) {
- Update->AddBreakAllLocks(table);
+ if (table->HasShardLocks()) {
+ Update->AddBreakShardLocks(table);
+ }
+ if (table->HasRangeLocks()) {
+ Update->AddBreakRangeLocks(table);
}
}
}
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index d5c910fb4eb..e9e844a9242 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -316,6 +316,19 @@ public:
void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
void RestorePersistentConflict(TLockInfo* otherLock);
+ template<class TCallback>
+ void ForAllConflicts(TCallback&& callback) {
+ for (auto& pr : ConflictLocks) {
+ callback(pr.first);
+ }
+ }
+
+ ui64 GetLastOpId() const { return LastOpId; }
+ void SetLastOpId(ui64 opId) { LastOpId = opId; }
+
+ bool IsFrozen() const { return Frozen; }
+ void SetFrozen() { Frozen = true; }
+
private:
void MakeShardLock();
bool AddShardLock(const TPathId& pathId);
@@ -355,13 +368,16 @@ private:
// A set of locks we must break on commit
THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks;
TVector<TPersistentRange> PersistentRanges;
+
+ ui64 LastOpId = 0;
+ bool Frozen = false;
};
struct TTableLocksReadListTag {};
struct TTableLocksWriteListTag {};
struct TTableLocksAffectedListTag {};
struct TTableLocksBreakShardListTag {};
-struct TTableLocksBreakAllListTag {};
+struct TTableLocksBreakRangeListTag {};
struct TTableLocksWriteConflictShardListTag {};
///
@@ -371,7 +387,7 @@ class TTableLocks
, public TIntrusiveListItem<TTableLocks, TTableLocksWriteListTag>
, public TIntrusiveListItem<TTableLocks, TTableLocksAffectedListTag>
, public TIntrusiveListItem<TTableLocks, TTableLocksBreakShardListTag>
- , public TIntrusiveListItem<TTableLocks, TTableLocksBreakAllListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksBreakRangeListTag>
, public TIntrusiveListItem<TTableLocks, TTableLocksWriteConflictShardListTag>
{
friend class TSysLocks;
@@ -401,8 +417,6 @@ public:
void RemoveShardLock(TLockInfo* lock);
void RemoveRangeLock(TLockInfo* lock);
void RemoveWriteLock(TLockInfo* lock);
- bool BreakShardLocks(const TRowVersion& at);
- bool BreakAllLocks(const TRowVersion& at);
ui64 NumKeyColumns() const {
return KeyColumnTypes.size();
@@ -431,6 +445,27 @@ public:
WriteLocks.clear();
}
+ template<class TCallback>
+ void ForEachRangeLock(TCallback&& callback) {
+ Ranges.EachRange([&callback](const TRangeTreeBase::TRange&, TLockInfo* lock) {
+ callback(lock);
+ });
+ }
+
+ template<class TCallback>
+ void ForEachShardLock(TCallback&& callback) {
+ for (TLockInfo* lock : ShardLocks) {
+ callback(lock);
+ }
+ }
+
+ template<class TCallback>
+ void ForEachWriteLock(TCallback&& callback) {
+ for (TLockInfo* lock : WriteLocks) {
+ callback(lock);
+ }
+ }
+
private:
const TPathId TableId;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
@@ -480,8 +515,6 @@ public:
ui64 BrokenLocksCount() const { return BrokenLocksCount_; }
void BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at);
- void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at);
- void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at);
void ForceBreakLock(ui64 lockId);
void RemoveLock(ui64 lockTxId, ILocksDb* db);
@@ -609,7 +642,7 @@ struct TLocksUpdate {
TIntrusiveList<TLockInfo, TLockInfoBreakListTag> BreakLocks;
TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag> BreakShardLocks;
- TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag> BreakAllLocks;
+ TIntrusiveList<TTableLocks, TTableLocksBreakRangeListTag> BreakRangeLocks;
TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks;
TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks;
@@ -653,8 +686,23 @@ struct TLocksUpdate {
BreakShardLocks.PushBack(table);
}
- void AddBreakAllLocks(TTableLocks* table) {
- BreakAllLocks.PushBack(table);
+ void AddBreakRangeLocks(TTableLocks* table) {
+ BreakRangeLocks.PushBack(table);
+ }
+
+ void FlattenBreakLocks() {
+ while (BreakShardLocks) {
+ TTableLocks* table = BreakShardLocks.PopFront();
+ table->ForEachShardLock([this](TLockInfo* lock) {
+ BreakLocks.PushBack(lock);
+ });
+ }
+ while (BreakRangeLocks) {
+ TTableLocks* table = BreakRangeLocks.PopFront();
+ table->ForEachRangeLock([this](TLockInfo* lock) {
+ BreakLocks.PushBack(lock);
+ });
+ }
}
void AddReadConflictLock(TLockInfo* lock) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 7a8d128190a..4467290cba0 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1904,4 +1904,25 @@ bool TPipeline::MarkPlannedLogicallyIncompleteUpTo(const TRowVersion& version, T
return hadWrites;
}
+bool TPipeline::AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks) {
+ bool addedDependencies = false;
+
+ guardLocks.FlattenBreakLocks();
+ for (auto& lock : guardLocks.BreakLocks) {
+ // We cannot break frozen locks
+ // Find their corresponding operations and reschedule
+ if (lock.IsFrozen()) {
+ if (auto conflictOp = FindOp(lock.GetLastOpId())) {
+ if (conflictOp != op) {
+ // FIXME: make sure this op is not complete
+ op->AddDependency(conflictOp);
+ addedDependencies = true;
+ }
+ }
+ }
+ }
+
+ return addedDependencies;
+}
+
}}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 96bcbdfe52e..b2199949f7d 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -366,6 +366,13 @@ public:
*/
bool MarkPlannedLogicallyIncompleteUpTo(const TRowVersion& version, TTransactionContext& txc);
+ /**
+ * Adds new runtime dependencies to op based on its buffered lock updates.
+ *
+ * Returns true when new dependencies were added and op must be rescheduled.
+ */
+ bool AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks);
+
private:
struct TStoredExecutionProfile {
TBasicOpInfo OpInfo;
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index d63d3774c73..d86aeb3b27f 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1491,6 +1491,10 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
struct TInjectLocks {
NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit;
TVector<TLockInfo> Locks;
+
+ void AddLocks(const TVector<TLockInfo>& locks) {
+ Locks.insert(Locks.end(), locks.begin(), locks.end());
+ }
};
class TInjectLockSnapshotObserver {
@@ -1611,10 +1615,25 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
break;
}
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ if (BlockReadSets) {
+ Cerr << "... blocked TEvReadSet" << Endl;
+ BlockedReadSets.push_back(THolder(ev.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
}
return PrevObserver(Runtime, ev);
}
+ void UnblockReadSets() {
+ BlockReadSets = false;
+ for (auto& ev : BlockedReadSets) {
+ Runtime.Send(ev.Release(), 0, true);
+ }
+ }
+
private:
TTestActorRuntime& Runtime;
TTestActorRuntime::TEventObserver PrevObserver;
@@ -1625,6 +1644,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
TVector<TLockInfo> LastLocks;
std::optional<TInjectLocks> InjectLocks;
bool InjectClearTasks = false;
+ bool BlockReadSets = false;
+ TVector<THolder<IEventHandle>> BlockedReadSets;
};
Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) {
@@ -2695,6 +2716,558 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
auto locks2 = observer.LastLocks;
observer.Inject = {};
}
+
+ Y_UNIT_TEST(LockedWriteDistributedCommitSuccess) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 20 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace();
+ observer.InjectLocks->AddLocks(locks1);
+ observer.InjectLocks->AddLocks(locks2);
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0);
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify changes are now visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-2`
+ WHERE key >= 10 AND key <= 30
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+ }
+
+ Y_UNIT_TEST(LockedWriteDistributedCommitAborted) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 20 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write to key 20, it will break tx 123
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 22)
+ )")),
+ "<empty>");
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace();
+ observer.InjectLocks->AddLocks(locks1);
+ observer.InjectLocks->AddLocks(locks2);
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0);
+ )")),
+ "ERROR: ABORTED");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify changes are not visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-2`
+ WHERE key >= 10 AND key <= 30
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 22 } } } "
+ "} Struct { Bool: false }");
+ }
+
+ Y_UNIT_TEST(LockedWriteDistributedCommitFreeze) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 20 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.BlockReadSets = true;
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace();
+ observer.InjectLocks->AddLocks(locks1);
+ observer.InjectLocks->AddLocks(locks2);
+ auto commitSender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, commitSender, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0);
+ )")));
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ UNIT_ASSERT(!observer.BlockedReadSets.empty());
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ auto writeSender = runtime.AllocateEdgeActor();
+ SendRequest(runtime, writeSender, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22)
+ )")));
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ // Verify changes are not visible yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ observer.UnblockReadSets();
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(writeSender);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+ }
+
+ Y_UNIT_TEST(LockedWriteDistributedCommitCrossConflict) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 20 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 21)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 3 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 22)
+ )")),
+ "<empty>");
+ auto locks3 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 30 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 22)
+ )")),
+ "<empty>");
+ auto locks4 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Commit changes in tx 123 (we expect locks to be ready for sending)
+ observer.BlockReadSets = true;
+ //observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace();
+ observer.InjectLocks->AddLocks(locks1);
+ observer.InjectLocks->AddLocks(locks2);
+ auto commitSender1 = runtime.AllocateEdgeActor();
+ SendRequest(runtime, commitSender1, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 21);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 21);
+ )")));
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ UNIT_ASSERT(!observer.BlockedReadSets.empty());
+ //observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Commit changes in tx 234 (we expect it to be blocked and broken by 123)
+ observer.BlockReadSets = true;
+ //observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace();
+ observer.InjectLocks->AddLocks(locks3);
+ observer.InjectLocks->AddLocks(locks4);
+ auto commitSender2 = runtime.AllocateEdgeActor();
+ SendRequest(runtime, commitSender2, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 22);
+ )")));
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ UNIT_ASSERT(!observer.BlockedReadSets.empty());
+ //observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify changes are not visible yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ observer.UnblockReadSets();
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender1);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(commitSender2);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::ABORTED);
+ }
+
+ // Verify only changes from commit 123 are visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-2`
+ WHERE key >= 10 AND key <= 30
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 10 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 20 } } Struct { Optional { Uint32: 21 } } } "
+ "List { Struct { Optional { Uint32: 30 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index a9d840cace0..1d0ea8ad17d 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -40,6 +40,11 @@ public:
return EExecutionStatus::Restart;
}
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
op->ChangeRecords() = std::move(tx->GetCollectedChanges());
DataShard.SysLocksTable().ApplyLocks();
diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
index e1949849105..cc31f453a65 100644
--- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
@@ -46,6 +46,11 @@ public:
DataShard.SysLocksTable().BreakAllLocks(fullTableId);
txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion);
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
DataShard.SysLocksTable().ApplyLocks();
DataShard.SubscribeNewLocks(ctx);
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 29a8a625806..33a1e8caafe 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -24,8 +24,12 @@ public:
private:
void ExecuteDataTx(TOperation::TPtr op,
- const TActorContext& ctx);
+ const TActorContext& ctx,
+ TSetupSysLocks& guardLocks);
void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx);
+
+private:
+ class TRescheduleOpException : public yexception {};
};
TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard,
@@ -154,7 +158,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
try {
try {
- ExecuteDataTx(op, ctx);
+ ExecuteDataTx(op, ctx, guardLocks);
} catch (const TNotReadyTabletException&) {
// We want to try pinning (actually precharging) all required pages
// before restarting the transaction, to minimize future restarts.
@@ -186,6 +190,14 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
tx->ReleaseTxData(txc, ctx);
return EExecutionStatus::Restart;
+ } catch (const TRescheduleOpException&) {
+ LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID()
+ << " needs to reschedule " << *op << " for dependencies");
+
+ tx->ReleaseTxData(txc, ctx);
+
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
}
DataShard.IncCounter(COUNTER_WAIT_EXECUTE_LATENCY_MS, waitExecuteLatency.MilliSeconds());
@@ -204,7 +216,9 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
- const TActorContext& ctx) {
+ const TActorContext& ctx,
+ TSetupSysLocks& guardLocks)
+{
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
IEngineFlat* engine = tx->GetDataTx()->GetEngine();
@@ -232,6 +246,11 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
}
IEngineFlat::EResult engineResult = engine->Execute();
+
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ throw TRescheduleOpException();
+ }
+
if (engineResult != IEngineFlat::EResult::Ok) {
TString errorMessage = TStringBuilder() << "Datashard execution error for " << *op << " at "
<< DataShard.TabletID() << ": " << engine->GetErrors();
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 6e68eabd848..d02c82aae7e 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -58,6 +58,11 @@ public:
return EExecutionStatus::Restart;
}
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
if (changeCollector) {
op->ChangeRecords() = std::move(changeCollector->GetCollected());
}
@@ -78,6 +83,11 @@ public:
Y_VERIFY(ok);
}
}
+
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
} else {
Y_FAIL_S("Invalid distributed erase tx: " << eraseTx->GetBody().ShortDebugString());
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 81a30fb91fd..2a4c014a419 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -256,6 +256,14 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return OnTabletNotReady(*tx, *dataTx, txc, ctx);
}
+ if (Pipeline.AddLockDependencies(op, guardLocks)) {
+ allocGuard.Release();
+ dataTx->ResetCollectedChanges();
+ tx->ReleaseTxData(txc, ctx);
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
Y_VERIFY(result);
op->Result().Swap(result);
op->SetKqpAttachedRSFlag();
diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h
index 5a4c7eb65c3..503f6f2e3e9 100644
--- a/ydb/core/tx/datashard/setup_sys_locks.h
+++ b/ydb/core/tx/datashard/setup_sys_locks.h
@@ -14,9 +14,6 @@ struct TSetupSysLocks
TSetupSysLocks(TDataShard& self, ILocksDb* db)
: SysLocksTable(self.SysLocksTable())
{
- CheckVersion = TRowVersion::Max();
- BreakVersion = TRowVersion::Min();
-
SysLocksTable.SetupUpdate(this, db);
}
@@ -25,8 +22,6 @@ struct TSetupSysLocks
{
LockTxId = lockTxId;
LockNodeId = lockNodeId;
- CheckVersion = TRowVersion::Max();
- BreakVersion = TRowVersion::Min();
SysLocksTable.SetupUpdate(this, db);
}
diff --git a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp
index a648b765fd2..9686c587f50 100644
--- a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp
@@ -57,6 +57,14 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
// N.B. we copy access log to locks cache, so that future lock access is repeatable
tx->LocksCache().Locks = tx->LocksAccessLog().Locks;
tx->DbStoreLocksAccessLog(&DataShard, txc, ctx);
+ // Freeze persistent locks that we have cached
+ for (auto& pr : tx->LocksCache().Locks) {
+ ui64 lockId = pr.first;
+ auto lock = DataShard.SysLocksTable().GetRawLock(lockId);
+ if (lock && lock->IsPersistent()) {
+ lock->SetFrozen();
+ }
+ }
tx->MarkLocksStored();
newArtifact = true;
}