aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-11-08 15:11:12 +0300
committersnaury <snaury@ydb.tech>2022-11-08 15:11:12 +0300
commitb35fb596af217a9bfca255332f929cf6a92f2c20 (patch)
tree6c4f1c31483493c1f0cc9728e724fab502a20948
parentacddd42666acd4b31e3232075fff25f9b13a8c00 (diff)
downloadydb-b35fb596af217a9bfca255332f929cf6a92f2c20.tar.gz
Limit the number of allowed changes per key
-rw-r--r--ydb/core/protos/config.proto5
-rw-r--r--ydb/core/tx/datashard/datashard.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp87
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp150
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp30
7 files changed, 266 insertions, 16 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 0f42c3b819d..ad86a339961 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1305,6 +1305,11 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 1,
DefaultValue: 0 }];
+ optional uint64 MaxLockedWritesPerKey = 15 [(ControlOptions) = {
+ Description: "Maximum number of uncommitted locked writes per key",
+ MinValue: 0,
+ MaxValue: 1000000,
+ DefaultValue: 1000 }];
}
message TTxLimitControls {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index eeb299a46b3..798d255c883 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -146,6 +146,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, EnablePrioritizedMvccSnapshotReads(1, 0, 1)
, EnableUnprotectedMvccSnapshotReads(1, 0, 1)
, EnableLockedWrites(0, 0, 1)
+ , MaxLockedWritesPerKey(1000, 0, 1000000)
, EnableLeaderLeases(1, 0, 1)
, MinLeaderLeaseDurationUs(250000, 1000, 5000000)
, DataShardSysTables(InitDataShardSysTables(this))
@@ -317,6 +318,7 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites");
+ appData->Icb->RegisterSharedControl(MaxLockedWritesPerKey, "DataShardControls.MaxLockedWritesPerKey");
appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases");
appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs");
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 939a9bbcabc..46b2370d316 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -606,30 +606,54 @@ public:
TSmallVec<TRawTypeValue> key;
ConvertTableKeys(Scheme, tableInfo, row, key, nullptr);
+ ui64 skipCount = 0;
+
+ NTable::ITransactionObserverPtr txObserver;
+ if (LockTxId) {
+ txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid);
+ } else {
+ txObserver = new TWriteTxObserver(this);
+ }
+
// We are not actually interested in the row version, we only need to
// detect uncommitted transaction skips on the path to that version.
auto res = Db.SelectRowVersion(
localTid, key, /* readFlags */ 0,
- GetReadTxMap(tableId),
- new TWriteTxObserver(this, tableId));
+ nullptr, txObserver);
if (res.Ready == NTable::EReady::Page) {
throw TNotReadyTabletException();
}
+
+ if (LockTxId) {
+ ui64 skipLimit = Self->GetMaxLockedWritesPerKey();
+ if (skipLimit > 0 && skipCount >= skipLimit) {
+ throw TLockedWriteLimitException();
+ }
+ }
}
- class TWriteTxObserver : public NTable::ITransactionObserver {
+ class TLockedWriteTxObserver : public NTable::ITransactionObserver {
public:
- TWriteTxObserver(const TDataShardEngineHost* host, const TTableId& tableId)
+ TLockedWriteTxObserver(const TDataShardEngineHost* host, ui64 txId, ui64& skipCount, ui32 localTid)
: Host(host)
- , TableId(tableId)
+ , SelfTxId(txId)
+ , SkipCount(skipCount)
+ , LocalTid(localTid)
{
- Y_UNUSED(Host);
- Y_UNUSED(TableId);
}
void OnSkipUncommitted(ui64 txId) override {
- Host->AddWriteConflict(TableId, txId);
+ if (!Host->Db.HasRemovedTx(LocalTid, txId)) {
+ ++SkipCount;
+ if (!SelfFound) {
+ if (txId != SelfTxId) {
+ Host->AddWriteConflict(txId);
+ } else {
+ SelfFound = true;
+ }
+ }
+ }
}
void OnSkipCommitted(const TRowVersion&) override {
@@ -650,16 +674,49 @@ public:
private:
const TDataShardEngineHost* const Host;
- const TTableId TableId;
+ const ui64 SelfTxId;
+ ui64& SkipCount;
+ const ui32 LocalTid;
+ bool SelfFound = false;
};
- void AddWriteConflict(const TTableId& tableId, ui64 txId) const {
- Y_UNUSED(tableId);
- if (LockTxId) {
- Self->SysLocksTable().AddWriteConflict(txId);
- } else {
- Self->SysLocksTable().BreakLock(txId);
+ class TWriteTxObserver : public NTable::ITransactionObserver {
+ public:
+ TWriteTxObserver(const TDataShardEngineHost* host)
+ : Host(host)
+ {
+ }
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Host->BreakWriteConflict(txId);
}
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ private:
+ const TDataShardEngineHost* const Host;
+ };
+
+ void AddWriteConflict(ui64 txId) const {
+ Self->SysLocksTable().AddWriteConflict(txId);
+ }
+
+ void BreakWriteConflict(ui64 txId) const {
+ Self->SysLocksTable().BreakLock(txId);
}
private:
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 9cb91f7f7a2..9d8ac1ba6f3 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -22,6 +22,8 @@ class TDataShard;
TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self);
+class TLockedWriteLimitException : public yexception {};
+
///
class TEngineBay : TNonCopyable {
public:
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index f0ec337de9e..e76cbd4abd2 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1415,6 +1415,11 @@ public:
return value != 0;
}
+ ui64 GetMaxLockedWritesPerKey() const {
+ ui64 value = MaxLockedWritesPerKey;
+ return value;
+ }
+
template <typename T>
void ReleaseCache(T& tx) {
ReleaseTxCache(tx.GetTxCacheUsage());
@@ -2287,6 +2292,7 @@ private:
TControlWrapper EnablePrioritizedMvccSnapshotReads;
TControlWrapper EnableUnprotectedMvccSnapshotReads;
TControlWrapper EnableLockedWrites;
+ TControlWrapper MaxLockedWritesPerKey;
TControlWrapper EnableLeaderLeases;
TControlWrapper MinLeaderLeaseDurationUs;
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index c9f66982695..a1dfa154089 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1411,6 +1411,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
ui64 Counter;
ui64 SchemeShard;
ui64 PathId;
+
+ friend bool operator==(const TLockInfo& a, const TLockInfo& b) = default;
};
struct TInjectLocks {
@@ -3457,6 +3459,154 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
observer.InjectLocks.reset();
}
+ Y_UNIT_TEST(LockedWritesLimitedPerKey) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+ controls.MutableDataShardControls()->SetMaxLockedWritesPerKey(2);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ std::optional<TLockHandle> lock1handle(std::in_place, 123, runtime.GetActorSystem(0));
+ std::optional<TLockHandle> lock2handle(std::in_place, 234, runtime.GetActorSystem(0));
+ std::optional<TLockHandle> lock3handle(std::in_place, 345, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 2 with tx 345
+ observer.Inject.LockId = 345;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 23)
+ )")),
+ "ERROR: GENERIC_ERROR");
+ observer.Inject = {};
+
+ // Abort tx 234, this would allow adding one more change to key 2
+ lock2handle.reset();
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ // Write uncommitted changes to key 2 with tx 345
+ observer.Inject.LockId = 345;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 23)
+ )")),
+ "<empty>");
+ auto locks3 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to key 3 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 31)
+ )")),
+ "<empty>");
+ UNIT_ASSERT(locks1 == observer.LastLocks);
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Commit changes in tx 345
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks3;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Check table has those changes visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 23 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 31 } }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 91c39564528..c3f5130ddde 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -280,7 +280,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes));
}
} else {
- // FIXME: handle lock changes commit
op->ChangeRecords() = std::move(changes);
}
@@ -305,6 +304,35 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Restart;
} catch (const TNotReadyTabletException&) {
return OnTabletNotReady(*tx, *dataTx, txc, ctx);
+ } catch (const TLockedWriteLimitException&) {
+ dataTx->ResetCollectedChanges();
+
+ op->SetAbortedFlag();
+
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR);
+
+ op->Result()->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED,
+ TStringBuilder() << "Shard " << DataShard.TabletID() << " cannot write more uncommitted changes");
+
+ for (auto& table : guardLocks.AffectedTables) {
+ Y_VERIFY(guardLocks.LockTxId);
+ op->Result()->AddTxLock(
+ guardLocks.LockTxId,
+ DataShard.TabletID(),
+ DataShard.Generation(),
+ Max<ui64>(),
+ table.GetTableId().OwnerId,
+ table.GetTableId().LocalPathId);
+ }
+
+ tx->ReleaseTxData(txc, ctx);
+
+ // Transaction may have made some changes before it hit the limit,
+ // so we need to roll them back. We do this by marking transaction for
+ // reschedule and restarting. The next cycle will detect aborted
+ // operation and move along.
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
} catch (const yexception& e) {
LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what());
if (op->IsReadOnly() || op->IsImmediate()) {