diff options
author | snaury <snaury@ydb.tech> | 2022-09-19 23:55:20 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-19 23:55:20 +0300 |
commit | ba94c55d49e858bea72c265a7a99fd7da200674f (patch) | |
tree | 9dd04b5ab60acb5378bda448e524b5daabc6e1fc | |
parent | f004dcc272c4ae5004e677f4fa18c07064f1beef (diff) | |
download | ydb-ba94c55d49e858bea72c265a7a99fd7da200674f.tar.gz |
Change visibility in read iterators, conflict processing
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 192 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 209 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/setup_sys_locks.h | 7 |
10 files changed, 388 insertions, 51 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 97f7107c562..75c346ea865 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -225,6 +225,7 @@ class TReader { const TReadIteratorState& State; IBlockBuilder& BlockBuilder; const TShortTableInfo& TableInfo; + TDataShard* Self; std::vector<NKikimr::NScheme::TTypeId> ColumnTypes; @@ -236,13 +237,17 @@ class TReader { ui64 BytesInResult = 0; - bool InvisibleRowSkipsMet = false; + bool HadInvisibleRowSkips_ = false; + bool HadInconsistentResult_ = false; NHPTimer::STime StartTime; NHPTimer::STime EndTime; static const NHPTimer::STime MaxCyclesPerIteration; + NTable::ITransactionMapPtr TxMap; + NTable::ITransactionObserverPtr TxObserver; + enum class EReadStatus { Done = 0, NeedData, @@ -252,10 +257,12 @@ class TReader { public: TReader(TReadIteratorState& state, IBlockBuilder& blockBuilder, - const TShortTableInfo& tableInfo) + const TShortTableInfo& tableInfo, + TDataShard* self) : State(state) , BlockBuilder(blockBuilder) , TableInfo(tableInfo) + , Self(self) , FirstUnprocessedQuery(State.FirstUnprocessedQuery) { GetTimeFast(&StartTime); @@ -292,10 +299,10 @@ public: EReadStatus result; if (!reverse) { - auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion); + auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); result = Iterate(iter.Get(), true, ctx); } else { - auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion); + auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); result = Iterate(iter.Get(), true, ctx); } @@ -342,9 +349,9 @@ public: NTable::TRowState rowState; rowState.Init(State.Columns.size()); NTable::TSelectStats stats; - auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion); + auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver()); RowsSinceLastCheck += 1 + stats.InvisibleRowSkips; - InvisibleRowSkipsMet |= stats.InvisibleRowSkips > 0; + HadInvisibleRowSkips_ |= stats.InvisibleRowSkips > 0; if (ready == NTable::EReady::Page) { return EReadStatus::NeedData; } @@ -505,7 +512,8 @@ public: } ui64 GetRowsRead() const { return RowsRead; } - bool HasInvisibleRowSkips() const { return InvisibleRowSkipsMet; } + bool HadInvisibleRowSkips() const { return HadInvisibleRowSkips_; } + bool HadInconsistentResult() const { return HadInconsistentResult_; } private: bool OutOfQuota() const { @@ -563,7 +571,7 @@ private: BlockBuilder.AddRow(TDbTupleRef(), rowValues); ++RowsRead; - InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0; + HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0; RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats); if (ShouldStop()) { return EReadStatus::StoppedByLimit; @@ -572,7 +580,7 @@ private: // last iteration to Page or Gone also might have deleted or invisible rows RowsSinceLastCheck += ResetRowStats(iter->Stats); - InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0; + HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0; // TODO: consider restart when Page and too few data read // (how much is too few, less than user's limit?) @@ -586,6 +594,81 @@ private: return EReadStatus::Done; } + + const NTable::ITransactionMapPtr& GetReadTxMap() { + if (!TxMap && + State.LockId && + !TSysTables::IsSystemTable(State.PathId) && + Self->SysLocksTable().HasWriteLock(State.LockId, State.PathId)) + { + TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min()); + } + + return TxMap; + } + + const NTable::ITransactionObserverPtr& GetReadTxObserver() { + if (!TxObserver && + State.LockId && + !TSysTables::IsSystemTable(State.PathId) && + Self->SysLocksTable().HasWriteLocks(State.PathId)) + { + TxObserver = new TReadTxObserver(this); + } + + return TxObserver; + } + + class TReadTxObserver : public NTable::ITransactionObserver { + public: + TReadTxObserver(TReader* reader) + : Reader(reader) + { + } + + void OnSkipUncommitted(ui64 txId) override { + Reader->AddReadConflict(txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // We already use InvisibleRowSkips for these + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // We already use InvisibleRowSkips for these + } + + void OnApplyCommitted(const TRowVersion& rowVersion) override { + Reader->CheckReadConflict(rowVersion); + } + + void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override { + Reader->CheckReadConflict(rowVersion); + } + + private: + TReader* const Reader; + }; + + void AddReadConflict(ui64 txId) { + Y_VERIFY(State.LockId); + // We have skipped uncommitted changes in txId, which would affect + // the read result when it commits. Add a conflict edge that breaks + // our lock when txId is committed. + Self->SysLocksTable().AddReadConflict(txId, State.LockId, State.LockNodeId); + } + + void CheckReadConflict(const TRowVersion& rowVersion) { + if (rowVersion > State.ReadVersion) { + // We have applied changes from a version above our snapshot + // Normally these changes are skipped (since we are reading from + // snapshot), but if we previously written changes for a key, + // modified by transactions after our snapshot, we would hit this + // code path. We have to break our own lock and make sure we won't + // reply with inconsistent results. + HadInconsistentResult_ = true; + } + } }; const NHPTimer::STime TReader::MaxCyclesPerIteration = @@ -756,6 +839,12 @@ public: } } + state.LockId = state.Request->Record.GetLockTxId(); + state.LockNodeId = state.Request->Record.GetLockNodeId(); + + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb); + if (!Read(txc, ctx, state)) return EExecutionStatus::Restart; @@ -793,10 +882,16 @@ public: bool hadWrites = false; - if (Request->Record.HasLockTxId()) { + if (state.LockId) { // note that we set locks only when first read finish transaction, // i.e. we have read something without page faults - hadWrites |= AcquireLock(txc, ctx, state); + AcquireLock(state, ctx); + + // Make sure we wait for commit (e.g. persisted lock added a write range) + hadWrites |= locksDb.HasChanges(); + + // We remember acquired lock for faster checking + state.Lock = guard.Lock; } if (!Self->IsFollower()) { @@ -1026,15 +1121,18 @@ public: if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TReadOperation::Execute() finished without Result, aborting"); - Self->DeleteReadIterator(it); - Result.reset(new TEvDataShard::TEvReadResult()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + Self->DeleteReadIterator(it); return; } + if (!Result->Record.HasStatus() && Reader && Reader->HadInconsistentResult()) { + SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction"); + } + // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { @@ -1074,6 +1172,12 @@ public: SendResult(ctx); + it = Self->ReadIterators.find(readId); + if (it == Self->ReadIterators.end()) { + // We sent an error and delete iterator + return; + } + Y_VERIFY(it->second); // note that we save the state only when there're unread queries @@ -1188,7 +1292,7 @@ private: Y_ASSERT(Result); - Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); + Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self)); return Reader->Read(txc, ctx); } @@ -1273,18 +1377,11 @@ private: ValidationInfo.Loaded = true; } - bool AcquireLock(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) { + void AcquireLock(TReadIteratorState& state, const TActorContext& ctx) { auto& sysLocks = Self->SysLocksTable(); - const auto lockTxId = state.Request->Record.GetLockTxId(); - const auto lockNodeId = state.Request->Record.GetLockNodeId(); TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion); - state.LockTxId = lockTxId; - - TDataShardLocksDb locksDb(*Self, txc); - TSetupSysLocks guard(lockTxId, lockNodeId, state.ReadVersion, *Self, &locksDb); - if (!state.Request->Keys.empty()) { for (size_t i = 0; i < state.Request->Keys.size(); ++i) { const auto& key = state.Request->Keys[i]; @@ -1295,21 +1392,21 @@ private: true, key.GetCells(), true); - sysLocks.SetLock(tableId, lockRange, lockTxId, lockNodeId); + sysLocks.SetLock(tableId, lockRange, state.LockId, state.LockNodeId); } else { - sysLocks.SetLock(tableId, key.GetCells(), lockTxId, lockNodeId); + sysLocks.SetLock(tableId, key.GetCells(), state.LockId, state.LockNodeId); } } } else { // no keys, so we must have ranges (has been checked initially) for (size_t i = 0; i < state.Request->Ranges.size(); ++i) { auto range = state.Request->Ranges[i].ToTableRange(); - sysLocks.SetLock(tableId, range, lockTxId, lockNodeId); + sysLocks.SetLock(tableId, range, state.LockId, state.LockNodeId); } } - if (Reader->HasInvisibleRowSkips()) { - sysLocks.BreakSetLocks(lockTxId, lockNodeId); + if (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult()) { + sysLocks.BreakSetLocks(state.LockId, state.LockNodeId); } auto locks = sysLocks.ApplyLocks(); @@ -1333,9 +1430,6 @@ private: << " Acquired lock# " << lock.LockId << ", counter# " << lock.Counter << " for " << state.PathId); } - - state.Lock = guard.Lock; // will be nullptr if broken - return locksDb.HasChanges(); } }; @@ -1589,7 +1683,10 @@ public: << " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery); - Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb); + + Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self)); if (Reader->Read(txc, ctx)) { SendResult(txc, ctx); return true; @@ -1602,6 +1699,8 @@ public: } void SendResult(TTransactionContext& txc, const TActorContext& ctx) { + Y_UNUSED(txc); + const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); @@ -1612,12 +1711,12 @@ public: if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() finished without Result, aborting"); - Self->DeleteReadIterator(it); Result.reset(new TEvDataShard::TEvReadResult()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); Result->Record.SetReadId(readId.ReadId); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Self->DeleteReadIterator(it); return; } @@ -1633,12 +1732,14 @@ public: return; } + Y_ASSERT(Reader); + Y_ASSERT(BlockBuilder); + if (state.Lock) { - bool isBroken = state.Lock->IsBroken(state.ReadVersion); - if (!isBroken && Reader->HasInvisibleRowSkips()) { - auto& sysLocks = Self->SysLocksTable(); - TDataShardLocksDb locksDb(*Self, txc); - TSetupSysLocks guard(*Self, &locksDb); + auto& sysLocks = Self->SysLocksTable(); + + bool isBroken = state.Lock->IsBroken(); + if (!isBroken && (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult())) { sysLocks.BreakLock(state.Lock->GetLockId()); sysLocks.ApplyLocks(); Y_VERIFY(state.Lock->IsBroken()); @@ -1657,13 +1758,24 @@ public: LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId << " TTxReadContinue::Execute() found broken lock# " << state.Lock->GetLockId()); + // A broken write lock means we are reading inconsistent results and must abort + if (state.Lock->IsWriteLock()) { + SetStatusError(record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction"); + record.SetSeqNo(state.SeqNo + 1); + record.SetReadId(readId.ReadId); + Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Self->DeleteReadIterator(it); + return; + } + state.Lock = nullptr; + } else { + // Lock valid, apply conflict changes + auto locks = sysLocks.ApplyLocks(); + Y_VERIFY(locks.empty(), "ApplyLocks acquired unexpected locks"); } } - Y_ASSERT(Reader); - Y_ASSERT(BlockBuilder); - Reader->FillResult(*Result); Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index 1a35cb5092a..0fc88d43d05 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -619,6 +619,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera // First pass, gather all reads/writes expanded with locks, add lock based dependencies bool haveReads = false; bool haveWrites = false; + bool haveWriteLock = false; if (haveKeys) { size_t keysCount = 0; const auto& keysInfo = op->GetKeysInfo(); @@ -701,6 +702,9 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera } } } + if (lock->IsWriteLock()) { + haveWriteLock = true; + } } } } @@ -731,7 +735,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera if (op->IsMvccSnapshotRead()) { snapshot = op->GetMvccSnapshot(); snapshotRepeatable = op->IsMvccSnapshotRepeatable(); - } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter)) { + } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !haveWriteLock)) { snapshot = readVersion; op->SetMvccSnapshot(snapshot, /* repeatable */ false); } diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index ea17bb7b9bb..ad3be7a5f39 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -673,7 +673,7 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { } } -void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { +void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc) { auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); if (!kqpTx.HasLocks()) { @@ -704,7 +704,7 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, } LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid); - txc.DB.CommitTx(localTid, txId); + txc.DB.CommitTx(localTid, txId, writeVersion); } } else { KqpEraseLocks(origin, tx, sysLocks); diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 505db767966..370266cba60 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -37,7 +37,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); -void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); +void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc); void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters); diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index f56ed44503d..428341444df 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -129,6 +129,15 @@ void TLockInfo::SetBroken(TRowVersion at) { } } +void TLockInfo::OnRemoved() { + if (!IsBroken()) { + BreakVersion = TRowVersion::Min(); + Counter = Max<ui64>(); + Points.clear(); + Ranges.clear(); + } +} + void TLockInfo::PersistLock(ILocksDb* db) { Y_VERIFY(!IsPersistent()); Y_VERIFY(db, "Cannot persist lock without a db"); @@ -592,6 +601,8 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) { Y_VERIFY(db, "Cannot remove persistent locks without a database"); txLock->PersistRemoveLock(db); } + + txLock->OnRemoved(); } } diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 7c44359ef58..4bf66b34b92 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -315,6 +315,7 @@ private: bool AddRange(const TRangeKey& range); bool AddWriteLock(const TPathId& pathId); void SetBroken(TRowVersion at); + void OnRemoved(); void PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db); @@ -608,7 +609,7 @@ struct TLocksUpdate { bool BreakOwn = false; bool HasLocks() const { - return bool(AffectedTables); + return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks); } void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index ecd4f660735..7ceefc319fc 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -2129,6 +2129,215 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { observer.InjectClearTasks = false; observer.InjectLocks.reset(); } + + std::unique_ptr<TEvDataShard::TEvRead> PrepareRead( + ui64 readId, + const TTableId& tableId, + const TRowVersion& snapshot, + const TVector<ui32>& columns) + { + auto request = std::make_unique<TEvDataShard::TEvRead>(); + auto& record = request->Record; + record.SetReadId(readId); + record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(tableId.SchemaVersion); + record.MutableSnapshot()->SetStep(snapshot.Step); + record.MutableSnapshot()->SetTxId(snapshot.TxId); + for (ui32 columnId : columns) { + record.AddColumns(columnId); + } + record.SetResultFormat(NKikimrTxDataShard::CELLVEC); + return request; + } + + void AddReadRange(TEvDataShard::TEvRead& request, ui32 fromKey, ui32 toKey) { + TVector<TCell> fromKeyCells = { TCell::Make(fromKey) }; + TVector<TCell> toKeyCells = { TCell::Make(toKey) }; + auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells); + auto toBuf = TSerializedCellVec::Serialize(toKeyCells); + request.Ranges.emplace_back(fromBuf, toBuf, true, true); + } + + TString ReadResultRowsString(const TEvDataShard::TEvReadResult& result) { + TStringBuilder builder; + for (size_t row = 0; row < result.GetRowsCount(); ++row) { + auto rowCells = result.GetCells(row); + for (size_t i = 0; i < rowCells.size(); ++i) { + if (i != 0) { + builder << ' '; + } + builder << rowCells[i].AsValue<ui32>(); + } + builder << '\n'; + } + return builder; + } + + Y_UNIT_TEST(MvccSnapshotReadLockedWrites) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Shards(1) + .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}})); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + TLockHandle lock3handle(345, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value, value2) VALUES (2, 21, 201) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 345 + observer.Inject.LockId = 345; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (2, 23) + )")), + "<empty>"); + auto locks3 = observer.LastLocks; + observer.Inject = {}; + + // Try to read uncommitted rows in tx 123 + { + auto readSender = runtime.AllocateEdgeActor(); + auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 }); + AddReadRange(*request, 1, 3); + request->Record.SetLockTxId(123); + request->Record.SetLockNodeId(runtime.GetNodeId(0)); + auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig()); + runtime.SendToPipe(clientId, readSender, request.release()); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender); + auto* response = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL( + ReadResultRowsString(*response), + "1 1 1\n" + "2 21 201\n"); + } + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Try to read uncommitted rows in tx 234 + { + auto readSender = runtime.AllocateEdgeActor(); + auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 }); + AddReadRange(*request, 1, 3); + request->Record.SetLockTxId(234); + request->Record.SetLockNodeId(runtime.GetNodeId(0)); + auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig()); + runtime.SendToPipe(clientId, readSender, request.release()); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender); + auto* response = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::ABORTED); + } + + // Try to read uncommitted rows in tx 345 + { + auto readSender = runtime.AllocateEdgeActor(); + auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 }); + AddReadRange(*request, 1, 3); + request->Record.SetLockTxId(345); + request->Record.SetLockNodeId(runtime.GetNodeId(0)); + request->Record.SetMaxRowsInResult(1); + auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig()); + runtime.SendToPipe(clientId, readSender, request.release()); + { + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender); + auto* response = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL( + ReadResultRowsString(*response), + "1 1 1\n"); + } + { + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender); + auto* response = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::ABORTED); + } + } + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 692c9967c2f..546350603cd 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->SetReadVersion(readVersion); dataTx->SetWriteVersion(writeVersion); - KqpCommitLocks(tabletId, tx, DataShard, txc); + KqpCommitLocks(tabletId, tx, writeVersion, DataShard, txc); auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 47b71bc6f96..6d984c50d9c 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -157,7 +157,8 @@ public: std::vector<NTable::TTag> Columns; TRowVersion ReadVersion = TRowVersion::Max(); bool IsHeadRead = false; - ui64 LockTxId = 0; + ui64 LockId = 0; + ui32 LockNodeId = 0; TLockInfo::TPtr Lock; // note that will be always overwritten by values from request diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h index 9cfd6bff8af..f10426c2a83 100644 --- a/ydb/core/tx/datashard/setup_sys_locks.h +++ b/ydb/core/tx/datashard/setup_sys_locks.h @@ -14,20 +14,19 @@ struct TSetupSysLocks TSetupSysLocks(TDataShard& self, ILocksDb* db) : SysLocksTable(self.SysLocksTable()) { - CheckVersion = TRowVersion::Min(); + CheckVersion = TRowVersion::Max(); BreakVersion = TRowVersion::Min(); SysLocksTable.SetTxUpdater(this); SysLocksTable.SetDb(db); } - TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, const TRowVersion& readVersion, - TDataShard& self, ILocksDb* db) + TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, TDataShard& self, ILocksDb* db) : SysLocksTable(self.SysLocksTable()) { LockTxId = lockTxId; LockNodeId = lockNodeId; - CheckVersion = readVersion; + CheckVersion = TRowVersion::Max(); BreakVersion = TRowVersion::Min(); SysLocksTable.SetTxUpdater(this); |