aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-15 18:25:20 +0300
committersnaury <snaury@ydb.tech>2022-09-15 18:25:20 +0300
commit2553b9e00feb59b262b8db6d94aa6a2ea4f19119 (patch)
treef5f6d6985288e08408f07f7fc45dec0ff44e39ff
parent3982fa8cd76790fb21c9ecfb5775cdcaa2368005 (diff)
downloadydb-2553b9e00feb59b262b8db6d94aa6a2ea4f19119.tar.gz
Break lock conflicts on commit
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h2
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp53
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp345
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp4
7 files changed, 418 insertions, 13 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index f1f18dcf3ca..294b14beedf 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -793,7 +793,7 @@ void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TCo
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
- "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry));
+ "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit,
0 /* bytesLimit */, reverse);
@@ -817,6 +817,9 @@ void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range
columnOps.emplace_back(std::move(op));
}
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
+
auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update;
auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps);
Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true));
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 9a784c24545..ea17bb7b9bb 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -205,7 +205,7 @@ bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
}
}
-bool NeedCommitLockChanges(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
+bool NeedCommitLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
switch (op) {
case NKikimrTxDataShard::TKqpLocks::Commit:
return true;
@@ -256,6 +256,10 @@ TVector<NKikimrTxDataShard::TLock> ValidateLocks(const NKikimrTxDataShard::TKqpL
auto lock = sysLocks.GetLock(lockKey);
if (lock.Generation != lockProto.GetGeneration() || lock.Counter != lockProto.GetCounter()) {
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "ValidateLocks: broken lock "
+ << lockProto.GetLockId()
+ << " expected " << lockProto.GetGeneration() << ":" << lockProto.GetCounter()
+ << " found " << lock.Generation << ":" << lock.Counter);
brokenLocks.push_back(lockProto);
}
}
@@ -669,20 +673,27 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
}
}
-void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
+void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
if (!kqpTx.HasLocks()) {
return;
}
- if (NeedCommitLockChanges(kqpTx.GetLocks().GetOp())) {
+ TSysLocks& sysLocks = dataShard.SysLocksTable();
+
+ if (NeedCommitLocks(kqpTx.GetLocks().GetOp())) {
// We assume locks have been validated earlier
for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
if (lockProto.GetDataShard() != origin) {
continue;
}
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLock " << lockProto.ShortDebugString());
+
+ auto lockKey = MakeLockKey(lockProto);
+ sysLocks.CommitLock(lockKey);
+
TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId());
auto localTid = dataShard.GetLocalTableId(tableId);
Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin);
@@ -692,9 +703,11 @@ void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataS
continue;
}
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid);
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid);
txc.DB.CommitTx(localTid, txId);
}
+ } else {
+ KqpEraseLocks(origin, tx, sysLocks);
}
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 1f45d28a1fe..505db767966 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -37,7 +37,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
-void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
+void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters);
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index ef6b7ef4b4e..f56ed44503d 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -8,6 +8,27 @@
namespace NKikimr {
namespace NDataShard {
+/**
+ * Locks are tested without an actor system, where actor context is unavailable
+ *
+ * This class is the minimum context implementation that doesn't log anything
+ * when running outside an event handler.
+ */
+struct TLockLoggerContext {
+ TLockLoggerContext() = default;
+
+ NLog::TSettings* LoggerSettings() const {
+ return TlsActivationContext ? TlsActivationContext->LoggerSettings() : nullptr;
+ }
+
+ bool Send(TAutoPtr<IEventHandle> ev) const {
+ return TlsActivationContext ? TlsActivationContext->Send(ev) : false;
+ }
+};
+
+// Logger requires an l-value, so we use an empty static variable
+static TLockLoggerContext LockLoggerContext;
+
// TLockInfo
TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
@@ -93,6 +114,8 @@ void TLockInfo::SetBroken(TRowVersion at) {
}
if (!IsBroken(at)) {
+ LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "Lock " << LockId << " marked broken at " << at);
+
BreakVersion = at;
Locker->ScheduleRemoveBrokenRanges(LockId, at);
@@ -820,6 +843,7 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
auto it = Cache->Locks.find(lockTxId);
if (it != Cache->Locks.end())
return it->second;
+ LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "TSysLocks::GetLock: lock " << lockTxId << " not found in cache");
return TLock();
}
@@ -828,8 +852,8 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
auto &checkVersion = Update->CheckVersion;
TLockInfo::TPtr txLock = Locker.GetLock(lockTxId, checkVersion);
if (txLock) {
- const auto& tableIds = txLock->GetReadTables();
if (key.size() == 2) { // locks v1
+ const auto& tableIds = txLock->GetReadTables();
Y_VERIFY(tableIds.size() == 1);
return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), *tableIds.begin());
} else { // locks v2
@@ -837,9 +861,20 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
TPathId tableId;
ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::SchemeShard, tableId.OwnerId);
ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::PathId, tableId.LocalPathId);
- if (ok && tableId && tableIds.contains(tableId))
- return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId);
+ if (ok && tableId) {
+ if (txLock->GetReadTables().contains(tableId) || txLock->GetWriteTables().contains(tableId)) {
+ return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId);
+ } else {
+ LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD,
+ "TSysLocks::GetLock: lock " << lockTxId << " exists, but not set for table " << tableId);
+ }
+ } else {
+ LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD,
+ "TSysLocks::GetLock: bad request for lock " << lockTxId);
+ }
}
+ } else {
+ LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "TSysLocks::GetLock: lock " << lockTxId << " not found");
}
Self->IncCounter(COUNTER_LOCKS_LOST);
@@ -853,6 +888,18 @@ void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) {
}
}
+void TSysLocks::CommitLock(const TArrayRef<const TCell>& key) {
+ Y_VERIFY(Update);
+ if (auto* lock = Locker.FindLockPtr(GetLockId(key))) {
+ for (auto& pr : lock->ConflictLocks) {
+ if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) {
+ Update->AddBreakLock(pr.first);
+ }
+ }
+ Update->AddEraseLock(lock);
+ }
+}
+
void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 0731925188f..7c44359ef58 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -714,6 +714,7 @@ public:
ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const;
TLock GetLock(const TArrayRef<const TCell>& syslockKey) const;
void EraseLock(const TArrayRef<const TCell>& syslockKey);
+ void CommitLock(const TArrayRef<const TCell>& syslockKey);
void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId);
void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 92c2dfa267c..ecd4f660735 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3,6 +3,7 @@
#include "datashard_active_transaction.h"
#include <ydb/core/formats/factory.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
@@ -1477,6 +1478,20 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
TRowVersion MvccSnapshot = TRowVersion::Min();
};
+ struct TLockInfo {
+ ui64 LockId;
+ ui64 DataShard;
+ ui32 Generation;
+ ui64 Counter;
+ ui64 SchemeShard;
+ ui64 PathId;
+ };
+
+ struct TInjectLocks {
+ NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit;
+ TVector<TLockInfo> Locks;
+ };
+
class TInjectLockSnapshotObserver {
public:
TInjectLockSnapshotObserver(TTestActorRuntime& runtime)
@@ -1500,7 +1515,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
if (record.GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) {
NKikimrTxDataShard::TDataTransaction tx;
Y_VERIFY(tx.ParseFromString(record.GetTxBody()));
- Cerr << "TxBody:" << Endl;
+ Cerr << "TxBody (original):" << Endl;
Cerr << tx.DebugString() << Endl;
if (tx.HasMiniKQL()) {
using namespace NKikimr::NMiniKQL;
@@ -1511,6 +1526,38 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
Cerr << PrintNode(node.GetNode()) << Endl;
}
if (tx.HasKqpTransaction()) {
+ if (InjectClearTasks && tx.GetKqpTransaction().TasksSize() > 0) {
+ tx.MutableKqpTransaction()->ClearTasks();
+ TString txBody;
+ Y_VERIFY(tx.SerializeToString(&txBody));
+ record.SetTxBody(txBody);
+ Cerr << "TxBody: cleared Tasks" << Endl;
+ }
+ if (InjectLocks) {
+ auto* protoLocks = tx.MutableKqpTransaction()->MutableLocks();
+ protoLocks->SetOp(InjectLocks->Op);
+ protoLocks->ClearLocks();
+ TSet<ui64> shards;
+ for (auto& lock : InjectLocks->Locks) {
+ auto* protoLock = protoLocks->AddLocks();
+ protoLock->SetLockId(lock.LockId);
+ protoLock->SetDataShard(lock.DataShard);
+ protoLock->SetGeneration(lock.Generation);
+ protoLock->SetCounter(lock.Counter);
+ protoLock->SetSchemeShard(lock.SchemeShard);
+ protoLock->SetPathId(lock.PathId);
+ shards.insert(lock.DataShard);
+ }
+ protoLocks->ClearSendingShards();
+ for (ui64 shard : shards) {
+ protoLocks->AddSendingShards(shard);
+ protoLocks->AddReceivingShards(shard);
+ }
+ TString txBody;
+ Y_VERIFY(tx.SerializeToString(&txBody));
+ record.SetTxBody(txBody);
+ Cerr << "TxBody: injected Locks" << Endl;
+ }
for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
if (task.HasProgram() && task.GetProgram().GetRaw()) {
using namespace NKikimr::NMiniKQL;
@@ -1534,6 +1581,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
TString txBody;
Y_VERIFY(tx.SerializeToString(&txBody));
record.SetTxBody(txBody);
+ Cerr << "TxBody: injected LockId" << Endl;
}
if (record.HasMvccSnapshot()) {
Last.MvccSnapshot.Step = record.GetMvccSnapshot().GetStep();
@@ -1541,10 +1589,27 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
} else if (Inject.MvccSnapshot) {
record.MutableMvccSnapshot()->SetStep(Inject.MvccSnapshot.Step);
record.MutableMvccSnapshot()->SetTxId(Inject.MvccSnapshot.TxId);
+ Cerr << "TEvProposeTransaction: injected MvccSnapshot" << Endl;
}
}
break;
}
+ case TEvDataShard::TEvProposeTransactionResult::EventType: {
+ auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record;
+ Cerr << "TEvProposeTransactionResult:" << Endl;
+ Cerr << record.DebugString() << Endl;
+ LastLocks.clear();
+ for (auto& protoLock : record.GetTxLocks()) {
+ auto& lock = LastLocks.emplace_back();
+ lock.LockId = protoLock.GetLockId();
+ lock.DataShard = protoLock.GetDataShard();
+ lock.Generation = protoLock.GetGeneration();
+ lock.Counter = protoLock.GetCounter();
+ lock.SchemeShard = protoLock.GetSchemeShard();
+ lock.PathId = protoLock.GetPathId();
+ }
+ break;
+ }
}
return PrevObserver(Runtime, ev);
}
@@ -1556,6 +1621,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
public:
TLockSnapshot Last;
TLockSnapshot Inject;
+ TVector<TLockInfo> LastLocks;
+ std::optional<TInjectLocks> InjectLocks;
+ bool InjectClearTasks = false;
};
Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) {
@@ -1786,6 +1854,281 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
}
+ Y_UNIT_TEST(MvccSnapshotLockedWritesWithoutConflicts) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Verify these changes are not visible yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Send a dummy upsert that we will be used as commit carrier for tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify tx 123 changes are now visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+
+ // Send a dummy upsert that we will be used as commit carrier for tx 234
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify tx 234 changes are now visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } "
+ "} Struct { Bool: false }");
+
+ // The still open read tx must have broken locks now
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
+ )")),
+ "ERROR: ABORTED");
+ }
+
+ Y_UNIT_TEST(MvccSnapshotLockedWritesWithConflicts) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Verify these changes are not visible yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Verify the open tx can commit writes (not broken yet)
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
+ )")),
+ "<empty>");
+
+ // Send a dummy upsert that we will be used as commit carrier for tx 234
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Verify tx 234 changes are now visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Send a dummy upsert that we will be used as commit carrier for tx 123
+ // It must not be able to commit, since it was broken by tx 234
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1)
+ )")),
+ "ERROR: ABORTED");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 5ff35cd413e..692c9967c2f 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->SetReadVersion(readVersion);
dataTx->SetWriteVersion(writeVersion);
- KqpCommitLockChanges(tabletId, tx, DataShard, txc);
+ KqpCommitLocks(tabletId, tx, DataShard, txc);
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
@@ -184,8 +184,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
op->Result().Swap(result);
op->SetKqpAttachedRSFlag();
- KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
-
if (dataTx->GetCounters().InvisibleRowSkips) {
DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId());
}