diff options
author | snaury <snaury@ydb.tech> | 2022-11-08 15:11:12 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-11-08 15:11:12 +0300 |
commit | b35fb596af217a9bfca255332f929cf6a92f2c20 (patch) | |
tree | 6c4f1c31483493c1f0cc9728e724fab502a20948 | |
parent | acddd42666acd4b31e3232075fff25f9b13a8c00 (diff) | |
download | ydb-b35fb596af217a9bfca255332f929cf6a92f2c20.tar.gz |
Limit the number of allowed changes per key
-rw-r--r-- | ydb/core/protos/config.proto | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 87 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 150 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 30 |
7 files changed, 266 insertions, 16 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 0f42c3b819d..ad86a339961 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1305,6 +1305,11 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 1, DefaultValue: 0 }]; + optional uint64 MaxLockedWritesPerKey = 15 [(ControlOptions) = { + Description: "Maximum number of uncommitted locked writes per key", + MinValue: 0, + MaxValue: 1000000, + DefaultValue: 1000 }]; } message TTxLimitControls { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index eeb299a46b3..798d255c883 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -146,6 +146,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , EnablePrioritizedMvccSnapshotReads(1, 0, 1) , EnableUnprotectedMvccSnapshotReads(1, 0, 1) , EnableLockedWrites(0, 0, 1) + , MaxLockedWritesPerKey(1000, 0, 1000000) , EnableLeaderLeases(1, 0, 1) , MinLeaderLeaseDurationUs(250000, 1000, 5000000) , DataShardSysTables(InitDataShardSysTables(this)) @@ -317,6 +318,7 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads"); appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads"); appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites"); + appData->Icb->RegisterSharedControl(MaxLockedWritesPerKey, "DataShardControls.MaxLockedWritesPerKey"); appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases"); appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs"); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 939a9bbcabc..46b2370d316 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -606,30 +606,54 @@ public: TSmallVec<TRawTypeValue> key; ConvertTableKeys(Scheme, tableInfo, row, key, nullptr); + ui64 skipCount = 0; + + NTable::ITransactionObserverPtr txObserver; + if (LockTxId) { + txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid); + } else { + txObserver = new TWriteTxObserver(this); + } + // We are not actually interested in the row version, we only need to // detect uncommitted transaction skips on the path to that version. auto res = Db.SelectRowVersion( localTid, key, /* readFlags */ 0, - GetReadTxMap(tableId), - new TWriteTxObserver(this, tableId)); + nullptr, txObserver); if (res.Ready == NTable::EReady::Page) { throw TNotReadyTabletException(); } + + if (LockTxId) { + ui64 skipLimit = Self->GetMaxLockedWritesPerKey(); + if (skipLimit > 0 && skipCount >= skipLimit) { + throw TLockedWriteLimitException(); + } + } } - class TWriteTxObserver : public NTable::ITransactionObserver { + class TLockedWriteTxObserver : public NTable::ITransactionObserver { public: - TWriteTxObserver(const TDataShardEngineHost* host, const TTableId& tableId) + TLockedWriteTxObserver(const TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid) : Host(host) - , TableId(tableId) + , SelfTxId(txId) + , SkipCount(skipCount) + , LocalTid(localTid) { - Y_UNUSED(Host); - Y_UNUSED(TableId); } void OnSkipUncommitted(ui64 txId) override { - Host->AddWriteConflict(TableId, txId); + if (!Host->Db.HasRemovedTx(LocalTid, txId)) { + ++SkipCount; + if (!SelfFound) { + if (txId != SelfTxId) { + Host->AddWriteConflict(txId); + } else { + SelfFound = true; + } + } + } } void OnSkipCommitted(const TRowVersion&) override { @@ -650,16 +674,49 @@ public: private: const TDataShardEngineHost* const Host; - const TTableId TableId; + const ui64 SelfTxId; + ui64& SkipCount; + const ui32 LocalTid; + bool SelfFound = false; }; - void AddWriteConflict(const TTableId& tableId, ui64 txId) const { - Y_UNUSED(tableId); - if (LockTxId) { - Self->SysLocksTable().AddWriteConflict(txId); - } else { - Self->SysLocksTable().BreakLock(txId); + class TWriteTxObserver : public NTable::ITransactionObserver { + public: + TWriteTxObserver(const TDataShardEngineHost* host) + : Host(host) + { + } + + void OnSkipUncommitted(ui64 txId) override { + Host->BreakWriteConflict(txId); } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64) override { + // nothing + } + + private: + const TDataShardEngineHost* const Host; + }; + + void AddWriteConflict(ui64 txId) const { + Self->SysLocksTable().AddWriteConflict(txId); + } + + void BreakWriteConflict(ui64 txId) const { + Self->SysLocksTable().BreakLock(txId); } private: diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 9cb91f7f7a2..9d8ac1ba6f3 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -22,6 +22,8 @@ class TDataShard; TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self); +class TLockedWriteLimitException : public yexception {}; + /// class TEngineBay : TNonCopyable { public: diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index f0ec337de9e..e76cbd4abd2 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1415,6 +1415,11 @@ public: return value != 0; } + ui64 GetMaxLockedWritesPerKey() const { + ui64 value = MaxLockedWritesPerKey; + return value; + } + template <typename T> void ReleaseCache(T& tx) { ReleaseTxCache(tx.GetTxCacheUsage()); @@ -2287,6 +2292,7 @@ private: TControlWrapper EnablePrioritizedMvccSnapshotReads; TControlWrapper EnableUnprotectedMvccSnapshotReads; TControlWrapper EnableLockedWrites; + TControlWrapper MaxLockedWritesPerKey; TControlWrapper EnableLeaderLeases; TControlWrapper MinLeaderLeaseDurationUs; diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index c9f66982695..a1dfa154089 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1411,6 +1411,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { ui64 Counter; ui64 SchemeShard; ui64 PathId; + + friend bool operator==(const TLockInfo& a, const TLockInfo& b) = default; }; struct TInjectLocks { @@ -3457,6 +3459,154 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { observer.InjectLocks.reset(); } + Y_UNIT_TEST(LockedWritesLimitedPerKey) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + controls.MutableDataShardControls()->SetMaxLockedWritesPerKey(2); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + std::optional<TLockHandle> lock1handle(std::in_place, 123, runtime.GetActorSystem(0)); + std::optional<TLockHandle> lock2handle(std::in_place, 234, runtime.GetActorSystem(0)); + std::optional<TLockHandle> lock3handle(std::in_place, 345, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 2 with tx 345 + observer.Inject.LockId = 345; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 23) + )")), + "ERROR: GENERIC_ERROR"); + observer.Inject = {}; + + // Abort tx 234, this would allow adding one more change to key 2 + lock2handle.reset(); + SimulateSleep(server, TDuration::Seconds(1)); + + // Write uncommitted changes to key 2 with tx 345 + observer.Inject.LockId = 345; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 23) + )")), + "<empty>"); + auto locks3 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to key 3 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 31) + )")), + "<empty>"); + UNIT_ASSERT(locks1 == observer.LastLocks); + observer.Inject = {}; + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Commit changes in tx 345 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks3; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Check table has those changes visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 23 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 31 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 91c39564528..c3f5130ddde 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -280,7 +280,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes)); } } else { - // FIXME: handle lock changes commit op->ChangeRecords() = std::move(changes); } @@ -305,6 +304,35 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Restart; } catch (const TNotReadyTabletException&) { return OnTabletNotReady(*tx, *dataTx, txc, ctx); + } catch (const TLockedWriteLimitException&) { + dataTx->ResetCollectedChanges(); + + op->SetAbortedFlag(); + + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR); + + op->Result()->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, + TStringBuilder() << "Shard " << DataShard.TabletID() << " cannot write more uncommitted changes"); + + for (auto& table : guardLocks.AffectedTables) { + Y_VERIFY(guardLocks.LockTxId); + op->Result()->AddTxLock( + guardLocks.LockTxId, + DataShard.TabletID(), + DataShard.Generation(), + Max<ui64>(), + table.GetTableId().OwnerId, + table.GetTableId().LocalPathId); + } + + tx->ReleaseTxData(txc, ctx); + + // Transaction may have made some changes before it hit the limit, + // so we need to roll them back. We do this by marking transaction for + // reschedule and restarting. The next cycle will detect aborted + // operation and move along. + txc.Reschedule(); + return EExecutionStatus::Restart; } catch (const yexception& e) { LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what()); if (op->IsReadOnly() || op->IsImmediate()) { |