aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-19 23:55:20 +0300
committersnaury <snaury@ydb.tech>2022-09-19 23:55:20 +0300
commitba94c55d49e858bea72c265a7a99fd7da200674f (patch)
tree9dd04b5ab60acb5378bda448e524b5daabc6e1fc
parentf004dcc272c4ae5004e677f4fa18c07064f1beef (diff)
downloadydb-ba94c55d49e858bea72c265a7a99fd7da200674f.tar.gz
Change visibility in read iterators, conflict processing
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp192
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h2
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp209
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/read_iterator.h3
-rw-r--r--ydb/core/tx/datashard/setup_sys_locks.h7
10 files changed, 388 insertions, 51 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 97f7107c562..75c346ea865 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -225,6 +225,7 @@ class TReader {
const TReadIteratorState& State;
IBlockBuilder& BlockBuilder;
const TShortTableInfo& TableInfo;
+ TDataShard* Self;
std::vector<NKikimr::NScheme::TTypeId> ColumnTypes;
@@ -236,13 +237,17 @@ class TReader {
ui64 BytesInResult = 0;
- bool InvisibleRowSkipsMet = false;
+ bool HadInvisibleRowSkips_ = false;
+ bool HadInconsistentResult_ = false;
NHPTimer::STime StartTime;
NHPTimer::STime EndTime;
static const NHPTimer::STime MaxCyclesPerIteration;
+ NTable::ITransactionMapPtr TxMap;
+ NTable::ITransactionObserverPtr TxObserver;
+
enum class EReadStatus {
Done = 0,
NeedData,
@@ -252,10 +257,12 @@ class TReader {
public:
TReader(TReadIteratorState& state,
IBlockBuilder& blockBuilder,
- const TShortTableInfo& tableInfo)
+ const TShortTableInfo& tableInfo,
+ TDataShard* self)
: State(state)
, BlockBuilder(blockBuilder)
, TableInfo(tableInfo)
+ , Self(self)
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
{
GetTimeFast(&StartTime);
@@ -292,10 +299,10 @@ public:
EReadStatus result;
if (!reverse) {
- auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion);
+ auto iter = txc.DB.IterateRange(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
result = Iterate(iter.Get(), true, ctx);
} else {
- auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion);
+ auto iter = txc.DB.IterateRangeReverse(TableInfo.LocalTid, iterRange, State.Columns, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
result = Iterate(iter.Get(), true, ctx);
}
@@ -342,9 +349,9 @@ public:
NTable::TRowState rowState;
rowState.Init(State.Columns.size());
NTable::TSelectStats stats;
- auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion);
+ auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion, GetReadTxMap(), GetReadTxObserver());
RowsSinceLastCheck += 1 + stats.InvisibleRowSkips;
- InvisibleRowSkipsMet |= stats.InvisibleRowSkips > 0;
+ HadInvisibleRowSkips_ |= stats.InvisibleRowSkips > 0;
if (ready == NTable::EReady::Page) {
return EReadStatus::NeedData;
}
@@ -505,7 +512,8 @@ public:
}
ui64 GetRowsRead() const { return RowsRead; }
- bool HasInvisibleRowSkips() const { return InvisibleRowSkipsMet; }
+ bool HadInvisibleRowSkips() const { return HadInvisibleRowSkips_; }
+ bool HadInconsistentResult() const { return HadInconsistentResult_; }
private:
bool OutOfQuota() const {
@@ -563,7 +571,7 @@ private:
BlockBuilder.AddRow(TDbTupleRef(), rowValues);
++RowsRead;
- InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0;
+ HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0;
RowsSinceLastCheck += 1 + ResetRowStats(iter->Stats);
if (ShouldStop()) {
return EReadStatus::StoppedByLimit;
@@ -572,7 +580,7 @@ private:
// last iteration to Page or Gone also might have deleted or invisible rows
RowsSinceLastCheck += ResetRowStats(iter->Stats);
- InvisibleRowSkipsMet |= iter->Stats.InvisibleRowSkips > 0;
+ HadInvisibleRowSkips_ |= iter->Stats.InvisibleRowSkips > 0;
// TODO: consider restart when Page and too few data read
// (how much is too few, less than user's limit?)
@@ -586,6 +594,81 @@ private:
return EReadStatus::Done;
}
+
+ const NTable::ITransactionMapPtr& GetReadTxMap() {
+ if (!TxMap &&
+ State.LockId &&
+ !TSysTables::IsSystemTable(State.PathId) &&
+ Self->SysLocksTable().HasWriteLock(State.LockId, State.PathId))
+ {
+ TxMap = new NTable::TSingleTransactionMap(State.LockId, TRowVersion::Min());
+ }
+
+ return TxMap;
+ }
+
+ const NTable::ITransactionObserverPtr& GetReadTxObserver() {
+ if (!TxObserver &&
+ State.LockId &&
+ !TSysTables::IsSystemTable(State.PathId) &&
+ Self->SysLocksTable().HasWriteLocks(State.PathId))
+ {
+ TxObserver = new TReadTxObserver(this);
+ }
+
+ return TxObserver;
+ }
+
+ class TReadTxObserver : public NTable::ITransactionObserver {
+ public:
+ TReadTxObserver(TReader* reader)
+ : Reader(reader)
+ {
+ }
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Reader->AddReadConflict(txId);
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnApplyCommitted(const TRowVersion& rowVersion) override {
+ Reader->CheckReadConflict(rowVersion);
+ }
+
+ void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override {
+ Reader->CheckReadConflict(rowVersion);
+ }
+
+ private:
+ TReader* const Reader;
+ };
+
+ void AddReadConflict(ui64 txId) {
+ Y_VERIFY(State.LockId);
+ // We have skipped uncommitted changes in txId, which would affect
+ // the read result when it commits. Add a conflict edge that breaks
+ // our lock when txId is committed.
+ Self->SysLocksTable().AddReadConflict(txId, State.LockId, State.LockNodeId);
+ }
+
+ void CheckReadConflict(const TRowVersion& rowVersion) {
+ if (rowVersion > State.ReadVersion) {
+ // We have applied changes from a version above our snapshot
+ // Normally these changes are skipped (since we are reading from
+ // snapshot), but if we previously written changes for a key,
+ // modified by transactions after our snapshot, we would hit this
+ // code path. We have to break our own lock and make sure we won't
+ // reply with inconsistent results.
+ HadInconsistentResult_ = true;
+ }
+ }
};
const NHPTimer::STime TReader::MaxCyclesPerIteration =
@@ -756,6 +839,12 @@ public:
}
}
+ state.LockId = state.Request->Record.GetLockTxId();
+ state.LockNodeId = state.Request->Record.GetLockNodeId();
+
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb);
+
if (!Read(txc, ctx, state))
return EExecutionStatus::Restart;
@@ -793,10 +882,16 @@ public:
bool hadWrites = false;
- if (Request->Record.HasLockTxId()) {
+ if (state.LockId) {
// note that we set locks only when first read finish transaction,
// i.e. we have read something without page faults
- hadWrites |= AcquireLock(txc, ctx, state);
+ AcquireLock(state, ctx);
+
+ // Make sure we wait for commit (e.g. persisted lock added a write range)
+ hadWrites |= locksDb.HasChanges();
+
+ // We remember acquired lock for faster checking
+ state.Lock = guard.Lock;
}
if (!Self->IsFollower()) {
@@ -1026,15 +1121,18 @@ public:
if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TReadOperation::Execute() finished without Result, aborting");
- Self->DeleteReadIterator(it);
-
Result.reset(new TEvDataShard::TEvReadResult());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId);
+ Self->DeleteReadIterator(it);
return;
}
+ if (!Result->Record.HasStatus() && Reader && Reader->HadInconsistentResult()) {
+ SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction");
+ }
+
// error happened and status set
auto& record = Result->Record;
if (record.HasStatus()) {
@@ -1074,6 +1172,12 @@ public:
SendResult(ctx);
+ it = Self->ReadIterators.find(readId);
+ if (it == Self->ReadIterators.end()) {
+ // We sent an error and delete iterator
+ return;
+ }
+
Y_VERIFY(it->second);
// note that we save the state only when there're unread queries
@@ -1188,7 +1292,7 @@ private:
Y_ASSERT(Result);
- Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
+ Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self));
return Reader->Read(txc, ctx);
}
@@ -1273,18 +1377,11 @@ private:
ValidationInfo.Loaded = true;
}
- bool AcquireLock(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) {
+ void AcquireLock(TReadIteratorState& state, const TActorContext& ctx) {
auto& sysLocks = Self->SysLocksTable();
- const auto lockTxId = state.Request->Record.GetLockTxId();
- const auto lockNodeId = state.Request->Record.GetLockNodeId();
TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion);
- state.LockTxId = lockTxId;
-
- TDataShardLocksDb locksDb(*Self, txc);
- TSetupSysLocks guard(lockTxId, lockNodeId, state.ReadVersion, *Self, &locksDb);
-
if (!state.Request->Keys.empty()) {
for (size_t i = 0; i < state.Request->Keys.size(); ++i) {
const auto& key = state.Request->Keys[i];
@@ -1295,21 +1392,21 @@ private:
true,
key.GetCells(),
true);
- sysLocks.SetLock(tableId, lockRange, lockTxId, lockNodeId);
+ sysLocks.SetLock(tableId, lockRange, state.LockId, state.LockNodeId);
} else {
- sysLocks.SetLock(tableId, key.GetCells(), lockTxId, lockNodeId);
+ sysLocks.SetLock(tableId, key.GetCells(), state.LockId, state.LockNodeId);
}
}
} else {
// no keys, so we must have ranges (has been checked initially)
for (size_t i = 0; i < state.Request->Ranges.size(); ++i) {
auto range = state.Request->Ranges[i].ToTableRange();
- sysLocks.SetLock(tableId, range, lockTxId, lockNodeId);
+ sysLocks.SetLock(tableId, range, state.LockId, state.LockNodeId);
}
}
- if (Reader->HasInvisibleRowSkips()) {
- sysLocks.BreakSetLocks(lockTxId, lockNodeId);
+ if (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult()) {
+ sysLocks.BreakSetLocks(state.LockId, state.LockNodeId);
}
auto locks = sysLocks.ApplyLocks();
@@ -1333,9 +1430,6 @@ private:
<< " Acquired lock# " << lock.LockId << ", counter# " << lock.Counter
<< " for " << state.PathId);
}
-
- state.Lock = guard.Lock; // will be nullptr if broken
- return locksDb.HasChanges();
}
};
@@ -1589,7 +1683,10 @@ public:
<< " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId
<< ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery);
- Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guard(state.LockId, state.LockNodeId, *Self, &locksDb);
+
+ Reader.reset(new TReader(state, *BlockBuilder, TableInfo, Self));
if (Reader->Read(txc, ctx)) {
SendResult(txc, ctx);
return true;
@@ -1602,6 +1699,8 @@ public:
}
void SendResult(TTransactionContext& txc, const TActorContext& ctx) {
+ Y_UNUSED(txc);
+
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
@@ -1612,12 +1711,12 @@ public:
if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() finished without Result, aborting");
- Self->DeleteReadIterator(it);
Result.reset(new TEvDataShard::TEvReadResult());
SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted");
Result->Record.SetReadId(readId.ReadId);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
+ Self->DeleteReadIterator(it);
return;
}
@@ -1633,12 +1732,14 @@ public:
return;
}
+ Y_ASSERT(Reader);
+ Y_ASSERT(BlockBuilder);
+
if (state.Lock) {
- bool isBroken = state.Lock->IsBroken(state.ReadVersion);
- if (!isBroken && Reader->HasInvisibleRowSkips()) {
- auto& sysLocks = Self->SysLocksTable();
- TDataShardLocksDb locksDb(*Self, txc);
- TSetupSysLocks guard(*Self, &locksDb);
+ auto& sysLocks = Self->SysLocksTable();
+
+ bool isBroken = state.Lock->IsBroken();
+ if (!isBroken && (Reader->HadInvisibleRowSkips() || Reader->HadInconsistentResult())) {
sysLocks.BreakLock(state.Lock->GetLockId());
sysLocks.ApplyLocks();
Y_VERIFY(state.Lock->IsBroken());
@@ -1657,13 +1758,24 @@ public:
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
<< " TTxReadContinue::Execute() found broken lock# " << state.Lock->GetLockId());
+ // A broken write lock means we are reading inconsistent results and must abort
+ if (state.Lock->IsWriteLock()) {
+ SetStatusError(record, Ydb::StatusIds::ABORTED, "Read conflict with concurrent transaction");
+ record.SetSeqNo(state.SeqNo + 1);
+ record.SetReadId(readId.ReadId);
+ Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
+ Self->DeleteReadIterator(it);
+ return;
+ }
+
state.Lock = nullptr;
+ } else {
+ // Lock valid, apply conflict changes
+ auto locks = sysLocks.ApplyLocks();
+ Y_VERIFY(locks.empty(), "ApplyLocks acquired unexpected locks");
}
}
- Y_ASSERT(Reader);
- Y_ASSERT(BlockBuilder);
-
Reader->FillResult(*Result);
Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId);
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
index 1a35cb5092a..0fc88d43d05 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
@@ -619,6 +619,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
// First pass, gather all reads/writes expanded with locks, add lock based dependencies
bool haveReads = false;
bool haveWrites = false;
+ bool haveWriteLock = false;
if (haveKeys) {
size_t keysCount = 0;
const auto& keysInfo = op->GetKeysInfo();
@@ -701,6 +702,9 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
}
}
}
+ if (lock->IsWriteLock()) {
+ haveWriteLock = true;
+ }
}
}
}
@@ -731,7 +735,7 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
if (op->IsMvccSnapshotRead()) {
snapshot = op->GetMvccSnapshot();
snapshotRepeatable = op->IsMvccSnapshotRepeatable();
- } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter)) {
+ } else if (op->IsImmediate() && (op->IsReadTable() || op->IsDataTx() && !haveWrites && !isGlobalWriter && !haveWriteLock)) {
snapshot = readVersion;
op->SetMvccSnapshot(snapshot, /* repeatable */ false);
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index ea17bb7b9bb..ad3be7a5f39 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -673,7 +673,7 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
}
}
-void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
+void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc) {
auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
if (!kqpTx.HasLocks()) {
@@ -704,7 +704,7 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard,
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid);
- txc.DB.CommitTx(localTid, txId);
+ txc.DB.CommitTx(localTid, txId, writeVersion);
}
} else {
KqpEraseLocks(origin, tx, sysLocks);
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 505db767966..370266cba60 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -37,7 +37,7 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
-void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
+void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard, TTransactionContext& txc);
void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters);
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index f56ed44503d..428341444df 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -129,6 +129,15 @@ void TLockInfo::SetBroken(TRowVersion at) {
}
}
+void TLockInfo::OnRemoved() {
+ if (!IsBroken()) {
+ BreakVersion = TRowVersion::Min();
+ Counter = Max<ui64>();
+ Points.clear();
+ Ranges.clear();
+ }
+}
+
void TLockInfo::PersistLock(ILocksDb* db) {
Y_VERIFY(!IsPersistent());
Y_VERIFY(db, "Cannot persist lock without a db");
@@ -592,6 +601,8 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) {
Y_VERIFY(db, "Cannot remove persistent locks without a database");
txLock->PersistRemoveLock(db);
}
+
+ txLock->OnRemoved();
}
}
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 7c44359ef58..4bf66b34b92 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -315,6 +315,7 @@ private:
bool AddRange(const TRangeKey& range);
bool AddWriteLock(const TPathId& pathId);
void SetBroken(TRowVersion at);
+ void OnRemoved();
void PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db);
@@ -608,7 +609,7 @@ struct TLocksUpdate {
bool BreakOwn = false;
bool HasLocks() const {
- return bool(AffectedTables);
+ return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks);
}
void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) {
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index ecd4f660735..7ceefc319fc 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -2129,6 +2129,215 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
observer.InjectClearTasks = false;
observer.InjectLocks.reset();
}
+
+ std::unique_ptr<TEvDataShard::TEvRead> PrepareRead(
+ ui64 readId,
+ const TTableId& tableId,
+ const TRowVersion& snapshot,
+ const TVector<ui32>& columns)
+ {
+ auto request = std::make_unique<TEvDataShard::TEvRead>();
+ auto& record = request->Record;
+ record.SetReadId(readId);
+ record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(tableId.SchemaVersion);
+ record.MutableSnapshot()->SetStep(snapshot.Step);
+ record.MutableSnapshot()->SetTxId(snapshot.TxId);
+ for (ui32 columnId : columns) {
+ record.AddColumns(columnId);
+ }
+ record.SetResultFormat(NKikimrTxDataShard::CELLVEC);
+ return request;
+ }
+
+ void AddReadRange(TEvDataShard::TEvRead& request, ui32 fromKey, ui32 toKey) {
+ TVector<TCell> fromKeyCells = { TCell::Make(fromKey) };
+ TVector<TCell> toKeyCells = { TCell::Make(toKey) };
+ auto fromBuf = TSerializedCellVec::Serialize(fromKeyCells);
+ auto toBuf = TSerializedCellVec::Serialize(toKeyCells);
+ request.Ranges.emplace_back(fromBuf, toBuf, true, true);
+ }
+
+ TString ReadResultRowsString(const TEvDataShard::TEvReadResult& result) {
+ TStringBuilder builder;
+ for (size_t row = 0; row < result.GetRowsCount(); ++row) {
+ auto rowCells = result.GetCells(row);
+ for (size_t i = 0; i < rowCells.size(); ++i) {
+ if (i != 0) {
+ builder << ' ';
+ }
+ builder << rowCells[i].AsValue<ui32>();
+ }
+ builder << '\n';
+ }
+ return builder;
+ }
+
+ Y_UNIT_TEST(MvccSnapshotReadLockedWrites) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Shards(1)
+ .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}}));
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+ TLockHandle lock3handle(345, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value, value2) VALUES (2, 21, 201)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 345
+ observer.Inject.LockId = 345;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (2, 23)
+ )")),
+ "<empty>");
+ auto locks3 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Try to read uncommitted rows in tx 123
+ {
+ auto readSender = runtime.AllocateEdgeActor();
+ auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 });
+ AddReadRange(*request, 1, 3);
+ request->Record.SetLockTxId(123);
+ request->Record.SetLockNodeId(runtime.GetNodeId(0));
+ auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig());
+ runtime.SendToPipe(clientId, readSender, request.release());
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender);
+ auto* response = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(
+ ReadResultRowsString(*response),
+ "1 1 1\n"
+ "2 21 201\n");
+ }
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Try to read uncommitted rows in tx 234
+ {
+ auto readSender = runtime.AllocateEdgeActor();
+ auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 });
+ AddReadRange(*request, 1, 3);
+ request->Record.SetLockTxId(234);
+ request->Record.SetLockNodeId(runtime.GetNodeId(0));
+ auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig());
+ runtime.SendToPipe(clientId, readSender, request.release());
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender);
+ auto* response = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::ABORTED);
+ }
+
+ // Try to read uncommitted rows in tx 345
+ {
+ auto readSender = runtime.AllocateEdgeActor();
+ auto request = PrepareRead(1, tableId, snapshot, { 1, 2, 3 });
+ AddReadRange(*request, 1, 3);
+ request->Record.SetLockTxId(345);
+ request->Record.SetLockNodeId(runtime.GetNodeId(0));
+ request->Record.SetMaxRowsInResult(1);
+ auto clientId = runtime.ConnectToPipe(shards.at(0), readSender, 0, NKikimr::NTabletPipe::TClientConfig());
+ runtime.SendToPipe(clientId, readSender, request.release());
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender);
+ auto* response = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(
+ ReadResultRowsString(*response),
+ "1 1 1\n");
+ }
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(readSender);
+ auto* response = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus().GetCode(), Ydb::StatusIds::ABORTED);
+ }
+ }
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 692c9967c2f..546350603cd 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -168,7 +168,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->SetReadVersion(readVersion);
dataTx->SetWriteVersion(writeVersion);
- KqpCommitLocks(tabletId, tx, DataShard, txc);
+ KqpCommitLocks(tabletId, tx, writeVersion, DataShard, txc);
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 47b71bc6f96..6d984c50d9c 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -157,7 +157,8 @@ public:
std::vector<NTable::TTag> Columns;
TRowVersion ReadVersion = TRowVersion::Max();
bool IsHeadRead = false;
- ui64 LockTxId = 0;
+ ui64 LockId = 0;
+ ui32 LockNodeId = 0;
TLockInfo::TPtr Lock;
// note that will be always overwritten by values from request
diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h
index 9cfd6bff8af..f10426c2a83 100644
--- a/ydb/core/tx/datashard/setup_sys_locks.h
+++ b/ydb/core/tx/datashard/setup_sys_locks.h
@@ -14,20 +14,19 @@ struct TSetupSysLocks
TSetupSysLocks(TDataShard& self, ILocksDb* db)
: SysLocksTable(self.SysLocksTable())
{
- CheckVersion = TRowVersion::Min();
+ CheckVersion = TRowVersion::Max();
BreakVersion = TRowVersion::Min();
SysLocksTable.SetTxUpdater(this);
SysLocksTable.SetDb(db);
}
- TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, const TRowVersion& readVersion,
- TDataShard& self, ILocksDb* db)
+ TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, TDataShard& self, ILocksDb* db)
: SysLocksTable(self.SysLocksTable())
{
LockTxId = lockTxId;
LockNodeId = lockNodeId;
- CheckVersion = readVersion;
+ CheckVersion = TRowVersion::Max();
BreakVersion = TRowVersion::Min();
SysLocksTable.SetTxUpdater(this);