diff options
author | snaury <snaury@ydb.tech> | 2022-09-23 16:58:42 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-23 16:58:42 +0300 |
commit | 7c520dd48df863dfd949097ee2842a4781031d31 (patch) | |
tree | 18202e1246d1a7bbcad498d7bec8824186b38f3c | |
parent | a155c78ca37fc2f240ed541cbb1ba56b631bd9e2 (diff) | |
download | ydb-7c520dd48df863dfd949097ee2842a4781031d31.tar.gz |
Handle read conflicts in kqp transactions
-rw-r--r-- | ydb/core/tablet_flat/flat_mem_iter.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_read_table.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 159 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 49 |
14 files changed, 306 insertions, 17 deletions
diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h index 9d809ef3ad..5de8862b8c 100644 --- a/ydb/core/tablet_flat/flat_mem_iter.h +++ b/ydb/core/tablet_flat/flat_mem_iter.h @@ -206,6 +206,9 @@ namespace NTable { ApplyColumn(row, up); } } + if (isDelta && row.IsFinalized()) { + break; + } } else { transactionObserver.OnSkipUncommitted(update->RowVersion.TxId); } diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index ba28e01ba2..27253f6eca 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -99,6 +99,9 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac if (dataTx->GetKqpComputeCtx().HasPersistentChannels()) { auto result = KqpRunTransaction(ctx, op->GetTxId(), dataTx->GetKqpTasks(), tasksRunner); + Y_VERIFY_S(!dataTx->GetKqpComputeCtx().HadInconsistentReads(), + "Unexpected inconsistent reads in operation " << *op << " when preparing persistent channels"); + if (result == NYql::NDq::ERunStatus::PendingInput && dataTx->GetKqpComputeCtx().IsTabletNotReady()) { allocGuard.Release(); return OnTabletNotReady(*tx, *dataTx, txc, ctx); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 294b14beed..41f3173286 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -280,13 +280,14 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) { /// class TDataShardEngineHost : public TEngineHost { public: - TDataShardEngineHost(TDataShard* self, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) + TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) : TEngineHost(db, counters, TEngineHostSettings(self->TabletID(), (self->State == TShardState::Readonly || self->State == TShardState::Frozen), self->ByKeyFilterDisabled(), self->GetKeyAccessSampler())) , Self(self) + , EngineBay(engineBay) , DB(db) , LockTxId(lockTxId) , LockNodeId(lockNodeId) @@ -351,6 +352,14 @@ public: return total; } + void ResetCollectedChanges() { + for (auto& pr : ChangeCollectors) { + if (pr.second) { + pr.second->Reset(); + } + } + } + bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override { if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); @@ -620,6 +629,7 @@ public: // and that future conflict, hence we must break locks and abort. // TODO: add an actual abort Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId); + EngineBay.GetKqpComputeCtx().SetInconsistentReads(); } } @@ -698,6 +708,7 @@ private: } TDataShard* Self; + TEngineBay& EngineBay; NTable::TDatabase& DB; const ui64& LockTxId; const ui32& LockNodeId; @@ -720,7 +731,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor , LockNodeId(0) { auto now = TAppData::TimeProvider->Now(); - EngineHost = MakeHolder<TDataShardEngineHost>(self, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now); + EngineHost = MakeHolder<TDataShardEngineHost>(self, *this, txc.DB, EngineHostCounters, LockTxId, LockNodeId, now); EngineSettings = MakeHolder<TEngineFlatSettings>(IEngineFlat::EProtocol::V1, AppData(ctx)->FunctionRegistry, *TAppData::RandomProvider, *TAppData::TimeProvider, EngineHost.Get(), self->AllocCounters); @@ -900,6 +911,11 @@ TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const { return host->GetCollectedChanges(); } +void TEngineBay::ResetCollectedChanges() { + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + host->ResetCollectedChanges(); +} + IEngineFlat * TEngineBay::GetEngine() { if (!Engine) { Engine = CreateEngineFlat(*EngineSettings); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index a960da0c9f..06ddf773c1 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -101,6 +101,7 @@ public: void SetIsRepeatableSnapshot(); TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const; + void ResetCollectedChanges(); void ResetCounters() { EngineHostCounters = TEngineHostCounters(); } const TEngineHostCounters& GetCounters() const { return EngineHostCounters; } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index d206e7d0bf..f2b882e785 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -181,6 +181,7 @@ public: void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); } TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); } + void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); } TActorId Source() const { return Source_; } void SetSource(const TActorId& actorId) { Source_ = actorId; } diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index ad3be7a5f3..92fbe03e4b 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -478,6 +478,10 @@ THolder<TEvDataShard::TEvProposeTransactionResult> KqpCompleteTransaction(const { auto runStatus = RunKqpTransactionInternal(ctx, txId, inReadSets, tasks, tasksRunner, /* applyEffects */ true); + if (computeCtx.HadInconsistentReads()) { + return nullptr; + } + if (runStatus == NYql::NDq::ERunStatus::PendingInput && computeCtx.IsTabletNotReady()) { return nullptr; } diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 2db6e83517..e85c6f81b9 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -400,14 +400,15 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con EngineHost.GetReadTxMap(tableId), EngineHost.GetReadTxObserver(tableId)); + if (InconsistentReads) { + return false; + } + kqpStats.NSelectRow = 1; kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips; switch (ready) { case EReady::Page: - if (auto collector = EngineHost.GetChangeCollector(tableId)) { - collector->Reset(); - } SetTabletNotReady(); return false; case EReady::Gone: @@ -485,6 +486,10 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats) { while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { + if (InconsistentReads) { + return false; + } + TDbTupleRef rowKey = iterator.GetKey(); MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount); @@ -521,9 +526,6 @@ bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTabl } if (iterator.Last() == NTable::EReady::Page) { - if (auto collector = EngineHost.GetChangeCollector(tableId)) { - collector->Reset(); - } SetTabletNotReady(); } @@ -536,6 +538,10 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead NUdf::TUnboxedValue* const* result, TKqpTableStats& stats) { while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { + if (InconsistentReads) { + return false; + } + TDbTupleRef rowKey = iterator.GetKey(); MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount); @@ -572,9 +578,6 @@ bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TRead } if (iterator.Last() == NTable::EReady::Page) { - if (auto collector = EngineHost.GetChangeCollector(tableId)) { - collector->Reset(); - } SetTabletNotReady(); } diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index d547030db8..3dbea628f3 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -83,6 +83,9 @@ public: const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, NUdf::TUnboxedValue* const* result, TKqpTableStats& stats); + bool HadInconsistentReads() const { return InconsistentReads; } + void SetInconsistentReads() { InconsistentReads = true; } + private: void TouchTableRange(const TTableId& tableId, const TTableRange& range) const; void TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const; @@ -97,7 +100,7 @@ private: const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, NUdf::TUnboxedValue* const* result, TKqpTableStats& stats); - void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; }; + void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; } public: NTable::TDatabase* Database = nullptr; @@ -111,6 +114,7 @@ private: ui32 LockNodeId = 0; bool PersistentChannels = false; bool TabletNotReady = false; + bool InconsistentReads = false; TRowVersion ReadVersion = TRowVersion::Min(); THashMap<std::pair<ui64, ui64>, TActorId> OutputChannels; }; diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp index e1938769fd..8cd0d4de4a 100644 --- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp @@ -127,7 +127,7 @@ public: return std::move(result); } - if (ComputeCtx.IsTabletNotReady()) { + if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) { return NUdf::TUnboxedValue::MakeYield(); } @@ -234,7 +234,7 @@ public: return result; } - if (ComputeCtx.IsTabletNotReady()) { + if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) { return NUdf::TUnboxedValue::MakeYield(); } diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index c57a8a3840..2054daaade 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -225,7 +225,7 @@ protected: return EFetchResult::One; } - if (ComputeCtx.IsTabletNotReady()) { + if (ComputeCtx.IsTabletNotReady() || ComputeCtx.HadInconsistentReads()) { return EFetchResult::Yield; } diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 428341444d..cffc2a0340 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -689,6 +689,28 @@ void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) { } } +// TLocksUpdate + +TLocksUpdate::~TLocksUpdate() { + auto cleanList = [](auto& list) { + while (!list.Empty()) { + list.PopFront(); + } + }; + + // We clean all lists to make sure items are not left linked together + cleanList(ReadTables); + cleanList(WriteTables); + cleanList(AffectedTables); + cleanList(BreakLocks); + cleanList(BreakShardLocks); + cleanList(BreakAllLocks); + cleanList(ReadConflictLocks); + cleanList(WriteConflictLocks); + cleanList(WriteConflictShardLocks); + cleanList(EraseLocks); +} + // TSysLocks TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { @@ -1038,16 +1060,32 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const { } bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const { + if (Update && Update->LockTxId == lockId && Update->WriteTables) { + if (auto* table = Locker.FindTablePtr(tableId.PathId)) { + if (table->IsInList<TTableLocksWriteListTag>()) { + return true; + } + } + } + if (auto* lock = Locker.FindLockPtr(lockId)) { - return lock->WriteTables.contains(tableId.PathId); + if (lock->WriteTables.contains(tableId.PathId)) { + return true; + } } return false; } bool TSysLocks::HasWriteLocks(const TTableId& tableId) const { + if (Update && Update->WriteTables) { + return true; + } + if (auto* table = Locker.FindTablePtr(tableId.PathId)) { - return !table->WriteLocks.empty(); + if (!table->WriteLocks.empty()) { + return true; + } } return false; diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 4bf66b34b9..d33a977756 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -377,6 +377,12 @@ public: : TableId(tableId) {} + template<class TTag> + bool IsInList() const { + using TItem = TIntrusiveListItem<TTableLocks, TTag>; + return !static_cast<const TItem*>(this)->Empty(); + } + TPathId GetTableId() const { return TableId; } void AddShardLock(TLockInfo* lock); @@ -608,6 +614,8 @@ struct TLocksUpdate { bool BreakOwn = false; + ~TLocksUpdate(); + bool HasLocks() const { return bool(AffectedTables) || bool(ReadConflictLocks) || bool(WriteConflictLocks); } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 7ceefc319f..c24dc99b0b 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -2338,6 +2338,165 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } } + + Y_UNIT_TEST(MvccSnapshotLockedWritesWithReadConflicts) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions() + .Shards(1) + .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false}})); + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 21, 201) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Read uncommitted rows in tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } Struct { Optional { Uint32: 201 } } } " + "} Struct { Bool: false }"); + observer.Inject = {}; + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Read uncommitted rows in tx 234 without value2 column + // It should succeed, since result does not depend on tx 123 changes + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 22 } } } " + "} Struct { Bool: false }"); + observer.Inject = {}; + + // Read uncommitted rows in tx 234 with the limit 1 + // It should succeed, since result does not depend on tx 123 changes + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + LIMIT 1 + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + observer.Inject = {}; + + // Read uncommitted rows in tx 234 with the limit 1 + // It should succeed, since result does not depend on tx 123 changes + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "ERROR: ABORTED"); + observer.Inject = {}; + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 546350603c..385189adaf 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -175,6 +175,51 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), op->HasKqpAttachedRSFlag() ? nullptr : &op->InReadSets(), dataTx->GetKqpTasks(), tasksRunner, computeCtx); + if (!result && computeCtx.HadInconsistentReads()) { + LOG_T("Operation " << *op << " (execute_kqp_data_tx) at " << tabletId + << " detected inconsistent reads and is going to abort"); + + allocGuard.Release(); + + dataTx->ResetCollectedChanges(); + + op->SetAbortedFlag(); + + // NOTE: we don't actually break locks, and rollback everything + // instead, so transaction may continue with different reads that + // won't conflict. This should not be considered a feature though, + // it's just that actually breaking this (potentially persistent) + // lock and rolling back changes will be unnecessarily complicated. + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::LOCKS_BROKEN); + + // Add a list of "broken" table locks to the result. It may be the + // case that the lock is not even set yet (write + read with conflicts), + // but we want kqp to have a list of affected tables, which is used + // when generating error messages. + // TODO: we would want an actual table id that caused inconsistency, + // relevant for future multi-table shards only. + // NOTE: generation may not match an existing lock, but it's not a problem. + for (auto& table : guardLocks.AffectedTables) { + Y_VERIFY(guardLocks.LockTxId); + op->Result()->AddTxLock( + guardLocks.LockTxId, + DataShard.TabletID(), + DataShard.Generation(), + Max<ui64>(), + table.GetTableId().OwnerId, + table.GetTableId().LocalPathId); + } + + tx->ReleaseTxData(txc, ctx); + + // Transaction may have made some changes before it detected + // inconsistency, so we need to roll them back. We do this by + // marking transaction for reschedule and restarting. The next + // cycle will detect aborted operation and move along. + txc.Reschedule(); + return EExecutionStatus::Restart; + } + if (!result && computeCtx.IsTabletNotReady()) { allocGuard.Release(); return OnTabletNotReady(*tx, *dataTx, txc, ctx); @@ -196,6 +241,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode(); KqpFillStats(DataShard, tasksRunner, computeCtx, statsMode, *op->Result()); } catch (const TMemoryLimitExceededException&) { + dataTx->ResetCollectedChanges(); + txc.NotEnoughMemory(); LOG_T("Operation " << *op << " at " << tabletId @@ -264,6 +311,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::OnTabletNotReady(TActiveTransaction& tx, DataShard.IncCounter(COUNTER_TX_TABLET_NOT_READY); + dataTx.ResetCollectedChanges(); + ui64 pageFaultCount = tx.IncrementPageFaultCount(); dataTx.GetKqpComputeCtx().PinPages(dataTx.TxInfo().Keys, pageFaultCount); |