diff options
author | snaury <snaury@ydb.tech> | 2022-09-26 15:48:09 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-26 15:48:09 +0300 |
commit | 4444d6973c1c9b0d3f541b3bb1f00d6b8f6a11ed (patch) | |
tree | a0a08c2f2c718d621ae2ee182470b7dd6c6cc7bf | |
parent | 823faff51b40346fc056bf0c3c26fe8969ccbc44 (diff) | |
download | ydb-4444d6973c1c9b0d3f541b3bb1f00d6b8f6a11ed.tar.gz |
Break uncommitted write conflicts in bulk write operations
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 57 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 106 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 8 |
6 files changed, 205 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 3540e87017..887d71410d 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3271,6 +3271,63 @@ void SendViaSession(const TActorId& sessionId, TActivationContext::Send(ev.Release()); } +class TBreakWriteConflictsTxObserver : public NTable::ITransactionObserver { +public: + TBreakWriteConflictsTxObserver(TDataShard* self) + : Self(self) + { + } + + void OnSkipUncommitted(ui64 txId) override { + Self->SysLocksTable().BreakLock(txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64) override { + // nothing + } + +private: + TDataShard* Self; +}; + +bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells) { + const auto localTid = GetLocalTableId(tableId); + Y_VERIFY(localTid); + const NTable::TScheme& scheme = db.GetScheme(); + const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTid); + TSmallVec<TRawTypeValue> key; + NMiniKQL::ConvertTableKeys(scheme, tableInfo, keyCells, key, nullptr); + + if (!BreakWriteConflictsTxObserver) { + BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this); + } + + // We are not actually interested in the row version, we only need to + // detect uncommitted transaction skips on the path to that version. + auto res = db.SelectRowVersion( + localTid, key, /* readFlags */ 0, + nullptr, + BreakWriteConflictsTxObserver); + + if (res.Ready == NTable::EReady::Page) { + return false; + } + + return true; +} + } // NDataShard TString TEvDataShard::TEvRead::ToString() const { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index f94c60bb11..f9506c55f6 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -57,6 +57,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans const bool readForTableShadow = writeToTableShadow && !shadowTableId; const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId; + const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId); + if (CollectChanges) { ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true)); } @@ -188,6 +190,16 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } if (BreakLocks) { + if (breakWriteConflicts) { + if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) { + pageFault = true; + } + + if (pageFault) { + continue; + } + } + self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells()); } } diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 422b64db5e..060c9e79be 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -75,6 +75,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } + const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId); + bool pageFault = false; for (const auto& serializedKey : request.GetKeyColumns()) { TSerializedCellVec keyCells; @@ -142,6 +144,12 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } + if (breakWriteConflicts) { + if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells())) { + pageFault = true; + } + } + if (pageFault) { continue; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 1a98d9ab77..c2332ff3b9 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1593,6 +1593,18 @@ public: void SubscribeNewLocks(const TActorContext &ctx); void SubscribeNewLocks(); + /** + * Breaks uncommitted write locks at the specified key + * + * Prerequisites: TSetupSysLocks is active and caller does not have any + * uncommitted write locks. + * Note: the specified table should have some write locks, otherwise + * this call is a very expensive no-op. + * + * Returns true on success and false on page fault. + */ + bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells); + private: /// class TLoanReturnTracker { @@ -2313,6 +2325,8 @@ private: TReadIteratorsMap ReadIterators; THashMap<TActorId, TReadIteratorSession> ReadIteratorSessions; + NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver; + protected: // Redundant init state required by flat executor implementation void StateInit(TAutoPtr<NActors::IEventHandle> &ev, const NActors::TActorContext &ctx) { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index c24dc99b0b..0cd1e6aec2 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -5,6 +5,7 @@ #include <ydb/core/formats/factory.h> #include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/tx_proxy/upload_rows.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD) @@ -2497,6 +2498,111 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "ERROR: ABORTED"); observer.Inject = {}; } + + Y_UNIT_TEST(LockedWriteBulkUpsertConflict) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write to key 2 using bulk upsert + { + using TRows = TVector<std::pair<TSerializedCellVec, TString>>; + using TRowTypes = TVector<std::pair<TString, Ydb::Type>>; + + auto types = std::make_shared<TRowTypes>(); + + Ydb::Type type; + type.set_type_id(Ydb::Type::UINT32); + types->emplace_back("key", type); + types->emplace_back("value", type); + + auto rows = std::make_shared<TRows>(); + + TVector<TCell> key{ TCell::Make(ui32(2)) }; + TVector<TCell> values{ TCell::Make(ui32(22)) }; + TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key)); + TString serializedValues(TSerializedCellVec::Serialize(values)); + rows->emplace_back(serializedKey, serializedValues); + + auto upsertSender = runtime.AllocateEdgeActor(); + auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows); + runtime.Register(actor); + + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS); + } + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "ERROR: ABORTED"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + } } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index d15c7e551c..f2c6bc2f80 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -100,6 +100,8 @@ public: Y_VERIFY(DataShard.GetUserTables().contains(tableId)); const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId); + const bool breakWriteConflicts = DataShard.SysLocksTable().HasWriteLocks(fullTableId); + size_t row = 0; bool pageFault = false; Y_FOR_EACH_BIT(i, presentRows) { @@ -128,6 +130,12 @@ public: } } + if (breakWriteConflicts) { + if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) { + pageFault = true; + } + } + if (pageFault) { continue; } |