aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-23 16:58:42 +0300
committersnaury <snaury@ydb.tech>2022-09-23 16:58:42 +0300
commit7c520dd48df863dfd949097ee2842a4781031d31 (patch)
tree18202e1246d1a7bbcad498d7bec8824186b38f3c
parenta155c78ca37fc2f240ed541cbb1ba56b631bd9e2 (diff)
downloadydb-7c520dd48df863dfd949097ee2842a4781031d31.tar.gz
Handle read conflicts in kqp transactions
-rw-r--r--ydb/core/tablet_flat/flat_mem_iter.h3
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h1
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_read_table.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp42
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp159
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp49
14 files changed, 306 insertions, 17 deletions
diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h
index 9d809ef3ad..5de8862b8c 100644
--- a/ydb/core/tablet_flat/flat_mem_iter.h
+++ b/ydb/core/tablet_flat/flat_mem_iter.h
@@ -206,6 +206,9 @@ namespace NTable {
ApplyColumn(row, up);
}
}
+ if (isDelta && row.IsFinalized()) {
+ break;
+ }
} else {
transactionObserver.OnSkipUncommitted(update->RowVersion.TxId);
}
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index ba28e01ba2..27253f6eca 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -99,6 +99,9 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) {
auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner);
+ Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(),
+ "Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels");
+
if (result == NYql::NDq::ERunStatus::PendingInput && dataTx->GetKqpComputeCtx().IsTabletNotReady()) {
allocGuard.Release();
return OnTabletNotReady(*tx, *dataTx, txc, ctx);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 294b14beed..41f3173286 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -280,13 +280,14 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) {
///
class TDataShardEngineHost : public TEngineHost {
public:
- TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
+ TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
: TEngineHost(db, counters,
TEngineHostSettings(self->TabletID(),
(self->State == TShardState::Readonly || self->State == TShardState::Frozen),
self->ByKeyFilterDisabled(),
self->GetKeyAccessSampler()))
, Self(self)
+ , EngineBay(engineBay)
, DB(db)
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
@@ -351,6 +352,14 @@ public:
return total;
}
+ void ResetCollectedChanges() {
+ for (auto& pr : ChangeCollectors) {
+ if (pr.second) {
+ pr.second->Reset();
+ }
+ }
+ }
+
bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override {
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
@@ -620,6 +629,7 @@ public:
// and that future conflict, hence we must break locks and abort.
// TODO: add an actual abort
Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId);
+ EngineBay.GetKqpComputeCtx().SetInconsistentReads();
}
}
@@ -698,6 +708,7 @@ private:
}
TDataShard* Self;
+ TEngineBay& EngineBay;
NTable::TDatabase& DB;
const ui64& LockTxId;
const ui32& LockNodeId;
@@ -720,7 +731,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
, LockNodeId(0)
{
auto now = TAppData::TimeProvider->Now();
- EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now);
+ EngineHost = MakeHolder<TDataShardEngineHost>(self, *this, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now);
EngineSettings = MakeHolder<TEngineFlatSettings>(IEngineFlat::EProtocol::V1, AppData(ctx)->FunctionRegistry,
*TAppData::RandomProvider, *TAppData::TimeProvider, EngineHost.Get(), self->AllocCounters);
@@ -900,6 +911,11 @@ TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const {
return host->GetCollectedChanges();
}
+void TEngineBay::ResetCollectedChanges() {
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ host->ResetCollectedChanges();
+}
+
IEngineFlat * TEngineBay::GetEngine() {
if (!Engine) {
Engine = CreateEngineFlat(*EngineSettings);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index a960da0c9f..06ddf773c1 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -101,6 +101,7 @@ public:
void SetIsRepeatableSnapshot();
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const;
+ void ResetCollectedChanges();
void ResetCounters() { EngineHostCounters = TEngineHostCounters(); }
const TEngineHostCounters& GetCounters() const { return EngineHostCounters; }
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index d206e7d0bf..f2b882e785 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -181,6 +181,7 @@ public:
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); }
+ void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); }
TActorId Source() const { return Source_; }
void SetSource(const TActorId& actorId) { Source_ = actorId; }
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index ad3be7a5f3..92fbe03e4b 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -478,6 +478,10 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const
{
auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true);
+ if (computeCtx.HadInconsistentReads()) {
+ return nullptr;
+ }
+
if (runStatus == NYql::NDq::ERunStatus::PendingInput && computeCtx.IsTabletNotReady()) {
return nullptr;
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 2db6e83517..e85c6f81b9 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -400,14 +400,15 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con
EngineHost.GetReadTxMap(tableId),
EngineHost.GetReadTxObserver(tableId));
+ if (InconsistentReads) {
+ return false;
+ }
+
kqpStats.NSelectRow = 1;
kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips;
switch (ready) {
case EReady::Page:
- if (auto collector = EngineHost.GetChangeCollector(tableId)) {
- collector->Reset();
- }
SetTabletNotReady();
return false;
case EReady::Gone:
@@ -485,6 +486,10 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl
const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats)
{
while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
+ if (InconsistentReads) {
+ return false;
+ }
+
TDbTupleRef rowKey = iterator.GetKey();
MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount);
@@ -521,9 +526,6 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl
}
if (iterator.Last() == NTable::EReady::Page) {
- if (auto collector = EngineHost.GetChangeCollector(tableId)) {
- collector->Reset();
- }
SetTabletNotReady();
}
@@ -536,6 +538,10 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead
NUdf::TUnboxedValue* const* result, TKqpTableStats& stats)
{
while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
+ if (InconsistentReads) {
+ return false;
+ }
+
TDbTupleRef rowKey = iterator.GetKey();
MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount);
@@ -572,9 +578,6 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead
}
if (iterator.Last() == NTable::EReady::Page) {
- if (auto collector = EngineHost.GetChangeCollector(tableId)) {
- collector->Reset();
- }
SetTabletNotReady();
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index d547030db8..3dbea628f3 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -83,6 +83,9 @@ public:
const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
NUdf::TUnboxedValue* const* result, TKqpTableStats& stats);
+ bool HadInconsistentReads() const { return InconsistentReads; }
+ void SetInconsistentReads() { InconsistentReads = true; }
+
private:
void TouchTableRange(const TTableId& tableId, const TTableRange& range) const;
void TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const;
@@ -97,7 +100,7 @@ private:
const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
NUdf::TUnboxedValue* const* result, TKqpTableStats& stats);
- void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; };
+ void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; }
public:
NTable::TDatabase* Database = nullptr;
@@ -111,6 +114,7 @@ private:
ui32 LockNodeId = 0;
bool PersistentChannels = false;
bool TabletNotReady = false;
+ bool InconsistentReads = false;
TRowVersion ReadVersion = TRowVersion::Min();
THashMap<std::pair<ui64, ui64>, TActorId> OutputChannels;
};
diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
index e1938769fd..8cd0d4de4a 100644
--- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
@@ -127,7 +127,7 @@ public:
return std::move(result);
}
- if (ComputeCtx.IsTabletNotReady()) {
+ if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) {
return NUdf::TUnboxedValue::MakeYield();
}
@@ -234,7 +234,7 @@ public:
return result;
}
- if (ComputeCtx.IsTabletNotReady()) {
+ if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) {
return NUdf::TUnboxedValue::MakeYield();
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
index c57a8a3840..2054daaade 100644
--- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
@@ -225,7 +225,7 @@ protected:
return EFetchResult::One;
}
- if (ComputeCtx.IsTabletNotReady()) {
+ if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) {
return EFetchResult::Yield;
}
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index 428341444d..cffc2a0340 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -689,6 +689,28 @@ void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) {
}
}
+// TLocksUpdate
+
+TLocksUpdate::~TLocksUpdate() {
+ auto cleanList = [](auto& list) {
+ while (!list.Empty()) {
+ list.PopFront();
+ }
+ };
+
+ // We clean all lists to make sure items are not left linked together
+ cleanList(ReadTables);
+ cleanList(WriteTables);
+ cleanList(AffectedTables);
+ cleanList(BreakLocks);
+ cleanList(BreakShardLocks);
+ cleanList(BreakAllLocks);
+ cleanList(ReadConflictLocks);
+ cleanList(WriteConflictLocks);
+ cleanList(WriteConflictShardLocks);
+ cleanList(EraseLocks);
+}
+
// TSysLocks
TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
@@ -1038,16 +1060,32 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const {
}
bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const {
+ if (Update && Update->LockTxId == lockId && Update->WriteTables) {
+ if (auto* table = Locker.FindTablePtr(tableId.PathId)) {
+ if (table->IsInList<TTableLocksWriteListTag>()) {
+ return true;
+ }
+ }
+ }
+
if (auto* lock = Locker.FindLockPtr(lockId)) {
- return lock->WriteTables.contains(tableId.PathId);
+ if (lock->WriteTables.contains(tableId.PathId)) {
+ return true;
+ }
}
return false;
}
bool TSysLocks::HasWriteLocks(const TTableId& tableId) const {
+ if (Update && Update->WriteTables) {
+ return true;
+ }
+
if (auto* table = Locker.FindTablePtr(tableId.PathId)) {
- return !table->WriteLocks.empty();
+ if (!table->WriteLocks.empty()) {
+ return true;
+ }
}
return false;
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 4bf66b34b9..d33a977756 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -377,6 +377,12 @@ public:
: TableId(tableId)
{}
+ template<class TTag>
+ bool IsInList() const {
+ using TItem = TIntrusiveListItem<TTableLocks, TTag>;
+ return !static_cast<const TItem*>(this)->Empty();
+ }
+
TPathId GetTableId() const { return TableId; }
void AddShardLock(TLockInfo* lock);
@@ -608,6 +614,8 @@ struct TLocksUpdate {
bool BreakOwn = false;
+ ~TLocksUpdate();
+
bool HasLocks() const {
return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks);
}
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 7ceefc319f..c24dc99b0b 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -2338,6 +2338,165 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}
}
+
+ Y_UNIT_TEST(MvccSnapshotLockedWritesWithReadConflicts) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions()
+ .Shards(1)
+ .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}}));
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 21, 201)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Read uncommitted rows in tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } Struct { Optional { Uint32: 201 } } } "
+ "} Struct { Bool: false }");
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Read uncommitted rows in tx 234 without value2 column
+ // It should succeed, since result does not depend on tx 123 changes
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } "
+ "} Struct { Bool: false }");
+ observer.Inject = {};
+
+ // Read uncommitted rows in tx 234 with the limit 1
+ // It should succeed, since result does not depend on tx 123 changes
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ LIMIT 1
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ observer.Inject = {};
+
+ // Read uncommitted rows in tx 234 with the limit 1
+ // It should succeed, since result does not depend on tx 123 changes
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "ERROR: ABORTED");
+ observer.Inject = {};
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 546350603c..385189adaf 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -175,6 +175,51 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),
op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx);
+ if (!result && computeCtx.HadInconsistentReads()) {
+ LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId
+ << " detected inconsistent reads and is going to abort");
+
+ allocGuard.Release();
+
+ dataTx->ResetCollectedChanges();
+
+ op->SetAbortedFlag();
+
+ // NOTE: we don't actually break locks, and rollback everything
+ // instead, so transaction may continue with different reads that
+ // won't conflict. This should not be considered a feature though,
+ // it's just that actually breaking this (potentially persistent)
+ // lock and rolling back changes will be unnecessarily complicated.
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN);
+
+ // Add a list of "broken" table locks to the result. It may be the
+ // case that the lock is not even set yet (write + read with conflicts),
+ // but we want kqp to have a list of affected tables, which is used
+ // when generating error messages.
+ // TODO: we would want an actual table id that caused inconsistency,
+ // relevant for future multi-table shards only.
+ // NOTE: generation may not match an existing lock, but it's not a problem.
+ for (auto& table : guardLocks.AffectedTables) {
+ Y_VERIFY(guardLocks.LockTxId);
+ op->Result()->AddTxLock(
+ guardLocks.LockTxId,
+ DataShard.TabletID(),
+ DataShard.Generation(),
+ Max<ui64>(),
+ table.GetTableId().OwnerId,
+ table.GetTableId().LocalPathId);
+ }
+
+ tx->ReleaseTxData(txc, ctx);
+
+ // Transaction may have made some changes before it detected
+ // inconsistency, so we need to roll them back. We do this by
+ // marking transaction for reschedule and restarting. The next
+ // cycle will detect aborted operation and move along.
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+
if (!result && computeCtx.IsTabletNotReady()) {
allocGuard.Release();
return OnTabletNotReady(*tx, *dataTx, txc, ctx);
@@ -196,6 +241,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode();
KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result());
} catch (const TMemoryLimitExceededException&) {
+ dataTx->ResetCollectedChanges();
+
txc.NotEnoughMemory();
LOG_T("Operation " << *op << " at " << tabletId
@@ -264,6 +311,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx,
DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY);
+ dataTx.ResetCollectedChanges();
+
ui64 pageFaultCount = tx.IncrementPageFaultCount();
dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount);