diff options
author | snaury <snaury@ydb.tech> | 2022-09-15 18:25:20 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-15 18:25:20 +0300 |
commit | 2553b9e00feb59b262b8db6d94aa6a2ea4f19119 (patch) | |
tree | f5f6d6985288e08408f07f7fc45dec0ff44e39ff | |
parent | 3982fa8cd76790fb21c9ecfb5775cdcaa2368005 (diff) | |
download | ydb-2553b9e00feb59b262b8db6d94aa6a2ea4f19119.tar.gz |
Break lock conflicts on commit
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 53 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 345 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 4 |
7 files changed, 418 insertions, 13 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index f1f18dcf3ca..294b14beedf 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -793,7 +793,7 @@ void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TCo } LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, - "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry)); + "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit, 0 /* bytesLimit */, reverse); @@ -817,6 +817,9 @@ void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range columnOps.emplace_back(std::move(op)); } + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); + auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update; auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps); Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true)); diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 9a784c24545..ea17bb7b9bb 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -205,7 +205,7 @@ bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { } } -bool NeedCommitLockChanges(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { +bool NeedCommitLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { switch (op) { case NKikimrTxDataShard::TKqpLocks::Commit: return true; @@ -256,6 +256,10 @@ TVector<NKikimrTxDataShard::TLock> ValidateLocks(const NKikimrTxDataShard::TKqpL auto lock = sysLocks.GetLock(lockKey); if (lock.Generation != lockProto.GetGeneration() || lock.Counter != lockProto.GetCounter()) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "ValidateLocks: broken lock " + << lockProto.GetLockId() + << " expected " << lockProto.GetGeneration() << ":" << lockProto.GetCounter() + << " found " << lock.Generation << ":" << lock.Counter); brokenLocks.push_back(lockProto); } } @@ -669,20 +673,27 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { } } -void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { +void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); if (!kqpTx.HasLocks()) { return; } - if (NeedCommitLockChanges(kqpTx.GetLocks().GetOp())) { + TSysLocks& sysLocks = dataShard.SysLocksTable(); + + if (NeedCommitLocks(kqpTx.GetLocks().GetOp())) { // We assume locks have been validated earlier for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { if (lockProto.GetDataShard() != origin) { continue; } + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLock " << lockProto.ShortDebugString()); + + auto lockKey = MakeLockKey(lockProto); + sysLocks.CommitLock(lockKey); + TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); auto localTid = dataShard.GetLocalTableId(tableId); Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin); @@ -692,9 +703,11 @@ void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataS continue; } - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid); + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid); txc.DB.CommitTx(localTid, txId); } + } else { + KqpEraseLocks(origin, tx, sysLocks); } } diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 1f45d28a1fe..505db767966 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -37,7 +37,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); -void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); +void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters); diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index ef6b7ef4b4e..f56ed44503d 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -8,6 +8,27 @@ namespace NKikimr { namespace NDataShard { +/** + * Locks are tested without an actor system, where actor context is unavailable + * + * This class is the minimum context implementation that doesn't log anything + * when running outside an event handler. + */ +struct TLockLoggerContext { + TLockLoggerContext() = default; + + NLog::TSettings* LoggerSettings() const { + return TlsActivationContext ? TlsActivationContext->LoggerSettings() : nullptr; + } + + bool Send(TAutoPtr<IEventHandle> ev) const { + return TlsActivationContext ? TlsActivationContext->Send(ev) : false; + } +}; + +// Logger requires an l-value, so we use an empty static variable +static TLockLoggerContext LockLoggerContext; + // TLockInfo TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId) @@ -93,6 +114,8 @@ void TLockInfo::SetBroken(TRowVersion at) { } if (!IsBroken(at)) { + LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "Lock " << LockId << " marked broken at " << at); + BreakVersion = at; Locker->ScheduleRemoveBrokenRanges(LockId, at); @@ -820,6 +843,7 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { auto it = Cache->Locks.find(lockTxId); if (it != Cache->Locks.end()) return it->second; + LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "TSysLocks::GetLock: lock " << lockTxId << " not found in cache"); return TLock(); } @@ -828,8 +852,8 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { auto &checkVersion = Update->CheckVersion; TLockInfo::TPtr txLock = Locker.GetLock(lockTxId, checkVersion); if (txLock) { - const auto& tableIds = txLock->GetReadTables(); if (key.size() == 2) { // locks v1 + const auto& tableIds = txLock->GetReadTables(); Y_VERIFY(tableIds.size() == 1); return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), *tableIds.begin()); } else { // locks v2 @@ -837,9 +861,20 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { TPathId tableId; ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::SchemeShard, tableId.OwnerId); ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::PathId, tableId.LocalPathId); - if (ok && tableId && tableIds.contains(tableId)) - return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId); + if (ok && tableId) { + if (txLock->GetReadTables().contains(tableId) || txLock->GetWriteTables().contains(tableId)) { + return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId); + } else { + LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, + "TSysLocks::GetLock: lock " << lockTxId << " exists, but not set for table " << tableId); + } + } else { + LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, + "TSysLocks::GetLock: bad request for lock " << lockTxId); + } } + } else { + LOG_TRACE_S(LockLoggerContext, NKikimrServices::TX_DATASHARD, "TSysLocks::GetLock: lock " << lockTxId << " not found"); } Self->IncCounter(COUNTER_LOCKS_LOST); @@ -853,6 +888,18 @@ void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) { } } +void TSysLocks::CommitLock(const TArrayRef<const TCell>& key) { + Y_VERIFY(Update); + if (auto* lock = Locker.FindLockPtr(GetLockId(key))) { + for (auto& pr : lock->ConflictLocks) { + if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) { + Update->AddBreakLock(pr.first); + } + } + Update->AddEraseLock(lock); + } +} + void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 0731925188f..7c44359ef58 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -714,6 +714,7 @@ public: ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const; TLock GetLock(const TArrayRef<const TCell>& syslockKey) const; void EraseLock(const TArrayRef<const TCell>& syslockKey); + void CommitLock(const TArrayRef<const TCell>& syslockKey); void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId); void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 92c2dfa267c..ecd4f660735 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3,6 +3,7 @@ #include "datashard_active_transaction.h" #include <ydb/core/formats/factory.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD) @@ -1477,6 +1478,20 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { TRowVersion MvccSnapshot = TRowVersion::Min(); }; + struct TLockInfo { + ui64 LockId; + ui64 DataShard; + ui32 Generation; + ui64 Counter; + ui64 SchemeShard; + ui64 PathId; + }; + + struct TInjectLocks { + NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit; + TVector<TLockInfo> Locks; + }; + class TInjectLockSnapshotObserver { public: TInjectLockSnapshotObserver(TTestActorRuntime& runtime) @@ -1500,7 +1515,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { if (record.GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) { NKikimrTxDataShard::TDataTransaction tx; Y_VERIFY(tx.ParseFromString(record.GetTxBody())); - Cerr << "TxBody:" << Endl; + Cerr << "TxBody (original):" << Endl; Cerr << tx.DebugString() << Endl; if (tx.HasMiniKQL()) { using namespace NKikimr::NMiniKQL; @@ -1511,6 +1526,38 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { Cerr << PrintNode(node.GetNode()) << Endl; } if (tx.HasKqpTransaction()) { + if (InjectClearTasks && tx.GetKqpTransaction().TasksSize() > 0) { + tx.MutableKqpTransaction()->ClearTasks(); + TString txBody; + Y_VERIFY(tx.SerializeToString(&txBody)); + record.SetTxBody(txBody); + Cerr << "TxBody: cleared Tasks" << Endl; + } + if (InjectLocks) { + auto* protoLocks = tx.MutableKqpTransaction()->MutableLocks(); + protoLocks->SetOp(InjectLocks->Op); + protoLocks->ClearLocks(); + TSet<ui64> shards; + for (auto& lock : InjectLocks->Locks) { + auto* protoLock = protoLocks->AddLocks(); + protoLock->SetLockId(lock.LockId); + protoLock->SetDataShard(lock.DataShard); + protoLock->SetGeneration(lock.Generation); + protoLock->SetCounter(lock.Counter); + protoLock->SetSchemeShard(lock.SchemeShard); + protoLock->SetPathId(lock.PathId); + shards.insert(lock.DataShard); + } + protoLocks->ClearSendingShards(); + for (ui64 shard : shards) { + protoLocks->AddSendingShards(shard); + protoLocks->AddReceivingShards(shard); + } + TString txBody; + Y_VERIFY(tx.SerializeToString(&txBody)); + record.SetTxBody(txBody); + Cerr << "TxBody: injected Locks" << Endl; + } for (const auto& task : tx.GetKqpTransaction().GetTasks()) { if (task.HasProgram() && task.GetProgram().GetRaw()) { using namespace NKikimr::NMiniKQL; @@ -1534,6 +1581,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { TString txBody; Y_VERIFY(tx.SerializeToString(&txBody)); record.SetTxBody(txBody); + Cerr << "TxBody: injected LockId" << Endl; } if (record.HasMvccSnapshot()) { Last.MvccSnapshot.Step = record.GetMvccSnapshot().GetStep(); @@ -1541,10 +1589,27 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } else if (Inject.MvccSnapshot) { record.MutableMvccSnapshot()->SetStep(Inject.MvccSnapshot.Step); record.MutableMvccSnapshot()->SetTxId(Inject.MvccSnapshot.TxId); + Cerr << "TEvProposeTransaction: injected MvccSnapshot" << Endl; } } break; } + case TEvDataShard::TEvProposeTransactionResult::EventType: { + auto& record = ev->Get<TEvDataShard::TEvProposeTransactionResult>()->Record; + Cerr << "TEvProposeTransactionResult:" << Endl; + Cerr << record.DebugString() << Endl; + LastLocks.clear(); + for (auto& protoLock : record.GetTxLocks()) { + auto& lock = LastLocks.emplace_back(); + lock.LockId = protoLock.GetLockId(); + lock.DataShard = protoLock.GetDataShard(); + lock.Generation = protoLock.GetGeneration(); + lock.Counter = protoLock.GetCounter(); + lock.SchemeShard = protoLock.GetSchemeShard(); + lock.PathId = protoLock.GetPathId(); + } + break; + } } return PrevObserver(Runtime, ev); } @@ -1556,6 +1621,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { public: TLockSnapshot Last; TLockSnapshot Inject; + TVector<TLockInfo> LastLocks; + std::optional<TInjectLocks> InjectLocks; + bool InjectClearTasks = false; }; Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) { @@ -1786,6 +1854,281 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); } + Y_UNIT_TEST(MvccSnapshotLockedWritesWithoutConflicts) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Verify these changes are not visible yet + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Send a dummy upsert that we will be used as commit carrier for tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify tx 123 changes are now visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + + // Send a dummy upsert that we will be used as commit carrier for tx 234 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks2; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify tx 234 changes are now visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } " + "} Struct { Bool: false }"); + + // The still open read tx must have broken locks now + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) + )")), + "ERROR: ABORTED"); + } + + Y_UNIT_TEST(MvccSnapshotLockedWritesWithConflicts) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Verify these changes are not visible yet + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Verify the open tx can commit writes (not broken yet) + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) + )")), + "<empty>"); + + // Send a dummy upsert that we will be used as commit carrier for tx 234 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks2; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Verify tx 234 changes are now visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Send a dummy upsert that we will be used as commit carrier for tx 123 + // It must not be able to commit, since it was broken by tx 234 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (1, 1) + )")), + "ERROR: ABORTED"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 5ff35cd413e..692c9967c2f 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->SetReadVersion(readVersion); dataTx->SetWriteVersion(writeVersion); - KqpCommitLockChanges(tabletId, tx, DataShard, txc); + KqpCommitLocks(tabletId, tx, DataShard, txc); auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); @@ -184,8 +184,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio op->Result().Swap(result); op->SetKqpAttachedRSFlag(); - KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); - if (dataTx->GetCounters().InvisibleRowSkips) { DataShard.SysLocksTable().BreakSetLocks(op->LockTxId(), op->LockNodeId()); } |