aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-26 15:48:09 +0300
committersnaury <snaury@ydb.tech>2022-09-26 15:48:09 +0300
commit4444d6973c1c9b0d3f541b3bb1f00d6b8f6a11ed (patch)
treea0a08c2f2c718d621ae2ee182470b7dd6c6cc7bf
parent823faff51b40346fc056bf0c3c26fe8969ccbc44 (diff)
downloadydb-4444d6973c1c9b0d3f541b3bb1f00d6b8f6a11ed.tar.gz
Break uncommitted write conflicts in bulk write operations
-rw-r--r--ydb/core/tx/datashard/datashard.cpp57
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp106
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp8
6 files changed, 205 insertions, 0 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 3540e87017..887d71410d 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3271,6 +3271,63 @@ void SendViaSession(const TActorId& sessionId,
TActivationContext::Send(ev.Release());
}
+class TBreakWriteConflictsTxObserver : public NTable::ITransactionObserver {
+public:
+ TBreakWriteConflictsTxObserver(TDataShard* self)
+ : Self(self)
+ {
+ }
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Self->SysLocksTable().BreakLock(txId);
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+private:
+ TDataShard* Self;
+};
+
+bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells) {
+ const auto localTid = GetLocalTableId(tableId);
+ Y_VERIFY(localTid);
+ const NTable::TScheme& scheme = db.GetScheme();
+ const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTid);
+ TSmallVec<TRawTypeValue> key;
+ NMiniKQL::ConvertTableKeys(scheme, tableInfo, keyCells, key, nullptr);
+
+ if (!BreakWriteConflictsTxObserver) {
+ BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this);
+ }
+
+ // We are not actually interested in the row version, we only need to
+ // detect uncommitted transaction skips on the path to that version.
+ auto res = db.SelectRowVersion(
+ localTid, key, /* readFlags */ 0,
+ nullptr,
+ BreakWriteConflictsTxObserver);
+
+ if (res.Ready == NTable::EReady::Page) {
+ return false;
+ }
+
+ return true;
+}
+
} // NDataShard
TString TEvDataShard::TEvRead::ToString() const {
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index f94c60bb11..f9506c55f6 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -57,6 +57,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
const bool readForTableShadow = writeToTableShadow && !shadowTableId;
const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId;
+ const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId);
+
if (CollectChanges) {
ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true));
}
@@ -188,6 +190,16 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
if (BreakLocks) {
+ if (breakWriteConflicts) {
+ if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) {
+ pageFault = true;
+ }
+
+ if (pageFault) {
+ continue;
+ }
+ }
+
self->SysLocksTable().BreakLock(fullTableId, keyCells.GetCells());
}
}
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 422b64db5e..060c9e79be 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -75,6 +75,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
}
+ const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId);
+
bool pageFault = false;
for (const auto& serializedKey : request.GetKeyColumns()) {
TSerializedCellVec keyCells;
@@ -142,6 +144,12 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
}
+ if (breakWriteConflicts) {
+ if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells())) {
+ pageFault = true;
+ }
+ }
+
if (pageFault) {
continue;
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 1a98d9ab77..c2332ff3b9 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1593,6 +1593,18 @@ public:
void SubscribeNewLocks(const TActorContext &ctx);
void SubscribeNewLocks();
+ /**
+ * Breaks uncommitted write locks at the specified key
+ *
+ * Prerequisites: TSetupSysLocks is active and caller does not have any
+ * uncommitted write locks.
+ * Note: the specified table should have some write locks, otherwise
+ * this call is a very expensive no-op.
+ *
+ * Returns true on success and false on page fault.
+ */
+ bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells);
+
private:
///
class TLoanReturnTracker {
@@ -2313,6 +2325,8 @@ private:
TReadIteratorsMap ReadIterators;
THashMap<TActorId, TReadIteratorSession> ReadIteratorSessions;
+ NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver;
+
protected:
// Redundant init state required by flat executor implementation
void StateInit(TAutoPtr<NActors::IEventHandle> &ev, const NActors::TActorContext &ctx) {
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index c24dc99b0b..0cd1e6aec2 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/formats/factory.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/tx_proxy/upload_rows.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
@@ -2497,6 +2498,111 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"ERROR: ABORTED");
observer.Inject = {};
}
+
+ Y_UNIT_TEST(LockedWriteBulkUpsertConflict) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write to key 2 using bulk upsert
+ {
+ using TRows = TVector<std::pair<TSerializedCellVec, TString>>;
+ using TRowTypes = TVector<std::pair<TString, Ydb::Type>>;
+
+ auto types = std::make_shared<TRowTypes>();
+
+ Ydb::Type type;
+ type.set_type_id(Ydb::Type::UINT32);
+ types->emplace_back("key", type);
+ types->emplace_back("value", type);
+
+ auto rows = std::make_shared<TRows>();
+
+ TVector<TCell> key{ TCell::Make(ui32(2)) };
+ TVector<TCell> values{ TCell::Make(ui32(22)) };
+ TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key));
+ TString serializedValues(TSerializedCellVec::Serialize(values));
+ rows->emplace_back(serializedKey, serializedValues);
+
+ auto upsertSender = runtime.AllocateEdgeActor();
+ auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows);
+ runtime.Register(actor);
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS);
+ }
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "ERROR: ABORTED");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+ }
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index d15c7e551c..f2c6bc2f80 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -100,6 +100,8 @@ public:
Y_VERIFY(DataShard.GetUserTables().contains(tableId));
const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId);
+ const bool breakWriteConflicts = DataShard.SysLocksTable().HasWriteLocks(fullTableId);
+
size_t row = 0;
bool pageFault = false;
Y_FOR_EACH_BIT(i, presentRows) {
@@ -128,6 +130,12 @@ public:
}
}
+ if (breakWriteConflicts) {
+ if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) {
+ pageFault = true;
+ }
+ }
+
if (pageFault) {
continue;
}