aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-09-06 13:58:57 +0300
committersnaury <snaury@ydb.tech>2022-09-06 13:58:57 +0300
commit0a0b987a5acb6f6a7f9a7a11175903af9e61ad63 (patch)
treef17c244af7d5ab54e369d9e25cdea6cbc0de23b1
parent4d2299379e613653fa653e340e899dee02174f57 (diff)
downloadydb-0a0b987a5acb6f6a7f9a7a11175903af9e61ad63.tar.gz
Persistent write locks in datashard,
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp175
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp139
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h36
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp33
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h1
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp849
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h399
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp167
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.h43
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h58
-rw-r--r--ydb/core/tx/datashard/datashard_ut_locks.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp323
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp11
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp19
-rw-r--r--ydb/core/tx/datashard/operation.h4
-rw-r--r--ydb/core/tx/datashard/read_iterator.h1
-rw-r--r--ydb/core/tx/datashard/remove_locks.cpp22
-rw-r--r--ydb/core/tx/datashard/setup_sys_locks.h45
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema173
27 files changed, 1989 insertions, 589 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 4ba7b98e24d..62157ed5b3e 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -148,6 +148,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_loans.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_locks_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_locks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_split_dst.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_split_src.cpp
diff --git a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp
index cd8d3dab69a..0d088881c28 100644
--- a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp
@@ -2,6 +2,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
namespace NKikimr {
namespace NDataShard {
@@ -43,7 +44,8 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op,
TTransactionContext &txc,
const TActorContext &ctx)
{
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
index 60311f907ec..ba28e01ba29 100644
--- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
@@ -3,6 +3,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
#include <ydb/core/kqp/rm/kqp_rm.h>
@@ -43,7 +44,8 @@ bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const {
EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransactionContext& txc,
const TActorContext& ctx)
{
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index d957d70e84d..f1f18dcf3ca 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -408,8 +408,11 @@ public:
return;
}
- // TODO: handle presistent tx locks
- if (!LockTxId) {
+ CheckWriteConflicts(tableId, row);
+
+ if (LockTxId) {
+ Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId);
+ } else {
Self->SysLocksTable().BreakLock(tableId, row);
}
Self->SetTableUpdateTime(tableId, Now);
@@ -463,8 +466,11 @@ public:
return;
}
- // TODO: handle persistent tx locks
- if (!LockTxId) {
+ CheckWriteConflicts(tableId, row);
+
+ if (LockTxId) {
+ Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId);
+ } else {
Self->SysLocksTable().BreakLock(tableId, row);
}
@@ -524,23 +530,166 @@ public:
if (TSysTables::IsSystemTable(tableId) || !LockTxId)
return nullptr;
- // Don't use tx map when we know there's no open tx with the given txId
- if (!DB.HasOpenTx(LocalTableId(tableId), LockTxId)) {
+ // Don't use tx map when we know there's no write lock for a table
+ // Note: currently write lock implies uncommitted changes
+ if (!Self->SysLocksTable().HasWriteLock(LockTxId, tableId)) {
return nullptr;
}
- // Uncommitted changes are visible in all possible snapshots
- // TODO: we need to guarantee no other changes committed between snapshot read and our local changes
- return new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ auto& ptr = TxMaps[tableId];
+ if (!ptr) {
+ // Uncommitted changes are visible in all possible snapshots
+ ptr = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ }
+
+ return ptr;
}
NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override {
- if (TSysTables::IsSystemTable(tableId))
+ if (TSysTables::IsSystemTable(tableId) || !LockTxId)
+ return nullptr;
+
+ if (!Self->SysLocksTable().HasWriteLocks(tableId)) {
+ // We don't have any active write locks, so there's nothing we
+ // could possibly conflict with.
return nullptr;
+ }
+
+ auto& ptr = TxObservers[tableId];
+ if (!ptr) {
+ // This observer is supposed to find conflicts
+ ptr = new TReadTxObserver(this, tableId);
+ }
+
+ return ptr;
+ }
+
+ class TReadTxObserver : public NTable::ITransactionObserver {
+ public:
+ TReadTxObserver(const TDataShardEngineHost* host, const TTableId& tableId)
+ : Host(host)
+ , TableId(tableId)
+ {
+ Y_UNUSED(Host);
+ Y_UNUSED(TableId);
+ }
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Host->AddReadConflict(TableId, txId);
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // We already use InvisibleRowSkips for these
+ }
+
+ void OnApplyCommitted(const TRowVersion& rowVersion) override {
+ Host->CheckReadConflict(TableId, rowVersion);
+ }
+
+ void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override {
+ Host->CheckReadConflict(TableId, rowVersion);
+ }
- // TODO: use observer to detect conflicts with other uncommitted transactions
+ private:
+ const TDataShardEngineHost* const Host;
+ const TTableId TableId;
+ };
- return nullptr;
+ void AddReadConflict(const TTableId& tableId, ui64 txId) const {
+ Y_UNUSED(tableId);
+ if (LockTxId) {
+ // We have detected uncommitted changes in txId that could affect
+ // our read result. We arrange a conflict that breaks our lock
+ // when txId commits.
+ Self->SysLocksTable().AddReadConflict(txId, LockTxId, LockNodeId);
+ }
+ }
+
+ void CheckReadConflict(const TTableId& tableId, const TRowVersion& rowVersion) const {
+ Y_UNUSED(tableId);
+ if (rowVersion > ReadVersion) {
+ // We are reading from snapshot at ReadVersion and should not normally
+ // observe changes with a version above that. However, if we have an
+ // uncommitted change, that we fake as committed for our own changes
+ // visibility, we might shadow some change that happened after a
+ // snapshot. This is a clear indication of a conflict between read
+ // and that future conflict, hence we must break locks and abort.
+ // TODO: add an actual abort
+ Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId);
+ }
+ }
+
+ void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) {
+ if (!Self->SysLocksTable().HasWriteLocks(tableId)) {
+ // We don't have any active write locks, so there's nothing we
+ // could possibly conflict with.
+ return;
+ }
+
+ const auto localTid = LocalTableId(tableId);
+ Y_VERIFY(localTid);
+ const TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(localTid);
+ TSmallVec<TRawTypeValue> key;
+ ConvertTableKeys(Scheme, tableInfo, row, key, nullptr);
+
+ // We are not actually interested in the row version, we only need to
+ // detect uncommitted transaction skips on the path to that version.
+ auto res = Db.SelectRowVersion(
+ localTid, key, /* readFlags */ 0,
+ GetReadTxMap(tableId),
+ new TWriteTxObserver(this, tableId));
+
+ if (res.Ready == NTable::EReady::Page) {
+ throw TNotReadyTabletException();
+ }
+ }
+
+ class TWriteTxObserver : public NTable::ITransactionObserver {
+ public:
+ TWriteTxObserver(const TDataShardEngineHost* host, const TTableId& tableId)
+ : Host(host)
+ , TableId(tableId)
+ {
+ Y_UNUSED(Host);
+ Y_UNUSED(TableId);
+ }
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Host->AddWriteConflict(TableId, txId);
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnSkipCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&) override {
+ // nothing
+ }
+
+ void OnApplyCommitted(const TRowVersion&, ui64) override {
+ // nothing
+ }
+
+ private:
+ const TDataShardEngineHost* const Host;
+ const TTableId TableId;
+ };
+
+ void AddWriteConflict(const TTableId& tableId, ui64 txId) const {
+ Y_UNUSED(tableId);
+ if (LockTxId) {
+ Self->SysLocksTable().AddWriteConflict(txId, LockTxId, LockNodeId);
+ } else {
+ Self->SysLocksTable().BreakLock(txId);
+ }
}
private:
@@ -558,6 +707,8 @@ private:
TRowVersion WriteVersion = TRowVersion::Max();
TRowVersion ReadVersion = TRowVersion::Min();
mutable THashMap<TTableId, THolder<IChangeCollector>> ChangeCollectors;
+ mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps;
+ mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers;
};
//
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index dc12c49959b..e5438459289 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -1,4 +1,5 @@
#include "datashard_txs.h"
+#include "datashard_locks_db.h"
#include <ydb/core/base/tx_processing.h>
#include <ydb/core/tablet/tablet_exception.h>
@@ -164,6 +165,9 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
PRECHARGE_SYS_TABLE(Schema::DstReplicationSourceOffsetsReceived);
PRECHARGE_SYS_TABLE(Schema::UserTablesStats);
PRECHARGE_SYS_TABLE(Schema::SchemaSnapshots);
+ PRECHARGE_SYS_TABLE(Schema::Locks);
+ PRECHARGE_SYS_TABLE(Schema::LockRanges);
+ PRECHARGE_SYS_TABLE(Schema::LockConflicts);
if (!ready)
return false;
@@ -479,6 +483,15 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}
+ if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
+ TDataShardLocksDb locksDb(*Self, txc);
+ if (!Self->SysLocks.Load(locksDb)) {
+ return false;
+ }
+ }
+
+ Self->SubscribeNewLocks();
+
return true;
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index cc0c87c6ee3..3145f6f50ec 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1,5 +1,7 @@
#include "datashard_impl.h"
#include "datashard_read_operation.h"
+#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
#include <ydb/core/formats/arrow_batch_builder.h>
@@ -789,16 +791,20 @@ public:
}
}
+ bool hadWrites = false;
+
if (Request->Record.HasLockTxId()) {
// note that we set locks only when first read finish transaction,
// i.e. we have read something without page faults
- AcquireLock(ctx, state);
+ hadWrites |= AcquireLock(txc, ctx, state);
}
- if (!Self->IsFollower())
- Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc);
+ if (!Self->IsFollower()) {
+ auto res = Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc);
+ hadWrites |= res.HadWrites;
+ }
- return EExecutionStatus::DelayComplete;
+ return hadWrites ? EExecutionStatus::DelayCompleteNoMoreRestarts : EExecutionStatus::DelayComplete;
}
void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override {
@@ -1268,17 +1274,18 @@ private:
ValidationInfo.Loaded = true;
}
- void AcquireLock(const TActorContext& ctx, TReadIteratorState& state) {
+ bool AcquireLock(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) {
auto& sysLocks = Self->SysLocksTable();
- auto& locker = sysLocks.GetLocker();
const auto lockTxId = state.Request->Record.GetLockTxId();
const auto lockNodeId = state.Request->Record.GetLockNodeId();
TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion);
- TLockInfo::TPtr lock;
state.LockTxId = lockTxId;
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guard(lockTxId, lockNodeId, state.ReadVersion, *Self, &locksDb);
+
if (!state.Request->Keys.empty()) {
for (size_t i = 0; i < state.Request->Keys.size(); ++i) {
const auto& key = state.Request->Keys[i];
@@ -1289,62 +1296,47 @@ private:
true,
key.GetCells(),
true);
- TRangeKey rangeKey = locker.MakeRange(tableId, lockRange);
- lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion);
+ sysLocks.SetLock(tableId, lockRange, lockTxId, lockNodeId);
} else {
- TPointKey pointKey = locker.MakePoint(tableId, key.GetCells());
- lock = locker.AddPointLock(lockTxId, lockNodeId, pointKey, state.ReadVersion);
+ sysLocks.SetLock(tableId, key.GetCells(), lockTxId, lockNodeId);
}
}
} else {
- // since no keys, then we must have ranges (has been checked initially)
+ // no keys, so we must have ranges (has been checked initially)
for (size_t i = 0; i < state.Request->Ranges.size(); ++i) {
auto range = state.Request->Ranges[i].ToTableRange();
- TRangeKey rangeKey = locker.MakeRange(tableId, range);
- lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion);
+ sysLocks.SetLock(tableId, range, lockTxId, lockNodeId);
}
}
- ui64 counter;
- ui64 lockId;
- bool isBroken;
- if (lock) {
- counter = lock->GetCounter(state.ReadVersion);
- lockId = lock->GetLockId();
- isBroken = lock->IsBroken(state.ReadVersion);
- } else {
- counter = TSysTables::TLocksTable::TLock::ErrorNotSet;
- lockId = lockTxId;
- isBroken = true;
- }
-
- if (!isBroken && Reader->HasInvisibleRowSkips()) {
- locker.BreakLock(lockTxId, TRowVersion::Min());
- isBroken = true;
- counter = TSysTables::TLocksTable::TLock::ErrorAlreadyBroken;
+ if (Reader->HasInvisibleRowSkips()) {
+ sysLocks.BreakSetLocks(lockTxId, lockNodeId);
}
- sysLocks.UpdateCounters(counter);
+ auto locks = sysLocks.ApplyLocks();
- NKikimrTxDataShard::TLock *addLock;
- if (!isBroken) {
- addLock = Result->Record.AddTxLocks();
- } else {
- addLock = Result->Record.AddBrokenTxLocks();
- }
+ for (auto& lock : locks) {
+ NKikimrTxDataShard::TLock* addLock;
+ if (lock.IsError()) {
+ addLock = Result->Record.AddBrokenTxLocks();
+ } else {
+ addLock = Result->Record.AddTxLocks();
+ }
- addLock->SetLockId(lockId);
- addLock->SetDataShard(Self->TabletID());
- addLock->SetGeneration(Self->Generation());
- addLock->SetCounter(counter);
- addLock->SetSchemeShard(state.PathId.OwnerId);
- addLock->SetPathId(state.PathId.LocalPathId);
+ addLock->SetLockId(lock.LockId);
+ addLock->SetDataShard(lock.DataShard);
+ addLock->SetGeneration(lock.Generation);
+ addLock->SetCounter(lock.Counter);
+ addLock->SetSchemeShard(lock.SchemeShard);
+ addLock->SetPathId(lock.PathId);
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
- << " Acquired lock# " << lockId << ", counter# " << counter
- << " for " << state.PathId);
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
+ << " Acquired lock# " << lock.LockId << ", counter# " << lock.Counter
+ << " for " << state.PathId);
+ }
- state.Lock = lock; // note that might be nullptr
+ state.Lock = guard.Lock; // will be nullptr if broken
+ return locksDb.HasChanges();
}
};
@@ -1527,7 +1519,7 @@ public:
Result->Record,
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId);
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
auto userTableInfo = it->second;
@@ -1539,7 +1531,7 @@ public:
Ydb::StatusIds::SCHEME_ERROR,
TStringBuilder() << "Schema changed, current " << currentSchemaVersion
<< ", requested table schemaversion " << state.SchemaVersion);
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
@@ -1552,7 +1544,7 @@ public:
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Failed to get scheme for table local id: "
<< state.PathId.LocalPathId);
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema);
@@ -1575,7 +1567,7 @@ public:
<< state.ReadVersion << " shard " << Self->TabletID()
<< " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark()
<< (Self->IsFollower() ? " RO replica" : ""));
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
@@ -1586,7 +1578,7 @@ public:
Result->Record,
Ydb::StatusIds::BAD_REQUEST,
p.second);
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
std::swap(BlockBuilder, p.first);
@@ -1600,7 +1592,7 @@ public:
Reader.reset(new TReader(state, *BlockBuilder, TableInfo));
if (Reader->Read(txc, ctx)) {
- SendResult(ctx);
+ SendResult(txc, ctx);
return true;
}
return false;
@@ -1610,7 +1602,7 @@ public:
// nothing to do
}
- void SendResult(const TActorContext& ctx) {
+ void SendResult(TTransactionContext& txc, const TActorContext& ctx) {
const auto* request = Ev->Get();
TReadIteratorId readId(request->Reader, request->ReadId);
auto it = Self->ReadIterators.find(readId);
@@ -1642,38 +1634,31 @@ public:
return;
}
- if (state.Lock && !state.ReportedLockBroken) {
- bool isBroken = false;
- ui64 counter;
- ui64 lockId;
- if (state.Lock->IsBroken(state.ReadVersion)) {
- isBroken = true;
- counter = state.Lock->GetCounter(state.ReadVersion);
- lockId = state.Lock->GetLockId();
-
- } else if (Reader->HasInvisibleRowSkips()) {
- isBroken = true;
- counter = TSysTables::TLocksTable::TLock::ErrorBroken;
- lockId = state.LockTxId;
-
+ if (state.Lock) {
+ bool isBroken = state.Lock->IsBroken(state.ReadVersion);
+ if (!isBroken && Reader->HasInvisibleRowSkips()) {
auto& sysLocks = Self->SysLocksTable();
- auto& locker = sysLocks.GetLocker();
- locker.BreakLock(state.LockTxId, TRowVersion::Min());
- sysLocks.UpdateCounters(counter);
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guard(*Self, &locksDb);
+ sysLocks.BreakLock(state.Lock->GetLockId());
+ sysLocks.ApplyLocks();
+ Y_VERIFY(state.Lock->IsBroken());
+ isBroken = true;
}
if (isBroken) {
- state.ReportedLockBroken = true;
NKikimrTxDataShard::TLock *addLock = record.AddBrokenTxLocks();
- addLock->SetLockId(lockId);
+ addLock->SetLockId(state.Lock->GetLockId());
addLock->SetDataShard(Self->TabletID());
- addLock->SetGeneration(Self->Generation());
- addLock->SetCounter(counter);
+ addLock->SetGeneration(state.Lock->GetGeneration());
+ addLock->SetCounter(state.Lock->GetCounter(state.ReadVersion));
addLock->SetSchemeShard(state.PathId.OwnerId);
addLock->SetPathId(state.PathId.LocalPathId);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId
- << " TTxReadContinue::Execute() found broken lock# " << lockId);
+ << " TTxReadContinue::Execute() found broken lock# " << state.Lock->GetLockId());
+
+ state.Lock = nullptr;
}
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index def43a6bff3..1a98d9ab77b 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -267,6 +267,7 @@ class TDataShard
friend class TS3DownloadsManager;
friend class TS3Downloader;
friend struct TSetupSysLocks;
+ friend class TDataShardLocksDb;
friend class TTxStartMvccStateChange;
friend class TTxExecuteMvccStateChange;
@@ -776,12 +777,44 @@ class TDataShard
using TColumns = TableColumns<PathOwnerId, LocalPathId, SchemaVersion, Step, TxId, Schema>;
};
+ struct Locks : Table<29> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct LockNodeId : Column<2, NScheme::NTypeIds::Uint32> {};
+ struct Generation : Column<3, NScheme::NTypeIds::Uint32> {};
+ struct Counter : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct CreateTimestamp : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct Flags : Column<6, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<LockId>;
+ using TColumns = TableColumns<LockId, LockNodeId, Generation, Counter, CreateTimestamp, Flags>;
+ };
+
+ struct LockRanges : Table<30> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct RangeId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct Flags : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct Data : Column<6, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<LockId, RangeId>;
+ using TColumns = TableColumns<LockId, RangeId, PathOwnerId, LocalPathId, Flags, Data>;
+ };
+
+ struct LockConflicts : Table<31> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct ConflictId : Column<2, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<LockId, ConflictId>;
+ using TColumns = TableColumns<LockId, ConflictId>;
+ };
+
using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue,
DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress,
Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts,
SrcChangeSenderActivations, DstChangeSenderActivations,
ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived,
- UserTablesStats, SchemaSnapshots>;
+ UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts>;
// These settings are persisted on each Init. So we use empty settings in order not to overwrite what
// was changed by the user
@@ -1558,6 +1591,7 @@ public:
void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx);
void SubscribeNewLocks(const TActorContext &ctx);
+ void SubscribeNewLocks();
private:
///
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index b49930a683c..9a784c24545 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -695,39 +695,6 @@ void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataS
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid);
txc.DB.CommitTx(localTid, txId);
}
- } else {
- KqpRollbackLockChanges(origin, tx, dataShard, txc);
- }
-}
-
-void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
- auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
-
- if (!kqpTx.HasLocks()) {
- return;
- }
-
- if (NeedEraseLocks(kqpTx.GetLocks().GetOp())) {
- for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
- if (lockProto.GetDataShard() != origin) {
- continue;
- }
-
- TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId());
- auto localTid = dataShard.GetLocalTableId(tableId);
- if (!localTid) {
- // It may have been dropped already
- continue;
- }
-
- auto txId = lockProto.GetLockId();
- if (!txc.DB.HasOpenTx(localTid, txId)) {
- continue;
- }
-
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpRollbackLockChanges: removing txId# " << txId << " from localTid# " << localTid);
- txc.DB.RemoveTx(localTid, txId);
- }
}
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 452fc92d337..1f45d28a1fe 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -38,7 +38,6 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
-void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters);
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index b83770b18e1..2db6e835171 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -393,16 +393,12 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con
TouchTablePoint(tableId, key);
Shard->GetKeyAccessSampler()->AddSample(tableId, key);
- NTable::ITransactionMapPtr txMap;
- if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
- txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
- }
- // TODO: tx observer
-
NTable::TRowState dbRow;
NTable::TSelectStats stats;
ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0;
- auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), txMap);
+ auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(),
+ EngineHost.GetReadTxMap(tableId),
+ EngineHost.GetReadTxObserver(tableId));
kqpStats.NSelectRow = 1;
kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips;
@@ -454,14 +450,10 @@ TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTa
keyRange.MinInclusive = range.InclusiveFrom;
keyRange.MaxInclusive = range.InclusiveTo;
- NTable::ITransactionMapPtr txMap;
- if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
- txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
- }
- // TODO: tx observer
-
TouchTableRange(tableId, range);
- return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), txMap);
+ return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(),
+ EngineHost.GetReadTxMap(tableId),
+ EngineHost.GetReadTxObserver(tableId));
}
TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId,
@@ -481,14 +473,10 @@ TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIter
keyRange.MinInclusive = range.InclusiveFrom;
keyRange.MaxInclusive = range.InclusiveTo;
- NTable::ITransactionMapPtr txMap;
- if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
- txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
- }
- // TODO: tx observer
-
TouchTableRange(tableId, range);
- return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), txMap);
+ return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(),
+ EngineHost.GetReadTxMap(tableId),
+ EngineHost.GetReadTxObserver(tableId));
}
template <typename TReadTableIterator>
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index 7b4181b0bc1..7385b514a7f 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -14,23 +14,54 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId)
: Locker(locker)
, LockId(lockId)
, LockNodeId(lockNodeId)
+ , Generation(locker->Generation())
, Counter(locker->IncCounter())
, CreationTime(TAppData::TimeProvider->Now())
{}
+TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs)
+ : Locker(locker)
+ , LockId(lockId)
+ , LockNodeId(lockNodeId)
+ , Generation(generation)
+ , Counter(counter)
+ , CreationTime(createTs)
+ , Persistent(true)
+{
+ if (counter == Max<ui64>()) {
+ BreakVersion.emplace(TRowVersion::Min());
+ }
+}
+
TLockInfo::~TLockInfo() {
- // nothing
+ if (!ConflictLocks.empty()) {
+ for (auto& pr : ConflictLocks) {
+ // Ensure there are no dangling pointers
+ pr.first->ConflictLocks.erase(this);
+ }
+ ConflictLocks.clear();
+ }
}
-void TLockInfo::AddShardLock(const THashSet<TPathId>& affectedTables) {
- AffectedTables.insert(affectedTables.begin(), affectedTables.end());
+void TLockInfo::MakeShardLock() {
ShardLock = true;
Points.clear();
Ranges.clear();
}
+bool TLockInfo::AddShardLock(const TPathId& pathId) {
+ Y_VERIFY(ShardLock);
+ if (ReadTables.insert(pathId).second) {
+ UnpersistedRanges = true;
+ return true;
+ }
+ return false;
+}
+
bool TLockInfo::AddPoint(const TPointKey& point) {
- AffectedTables.insert(point.Table->GetTableId());
+ if (ReadTables.insert(point.Table->GetTableId()).second) {
+ UnpersistedRanges = true;
+ }
if (!ShardLock) {
Points.emplace_back(point);
}
@@ -38,32 +69,217 @@ bool TLockInfo::AddPoint(const TPointKey& point) {
}
bool TLockInfo::AddRange(const TRangeKey& range) {
- AffectedTables.insert(range.Table->GetTableId());
+ if (ReadTables.insert(range.Table->GetTableId()).second) {
+ UnpersistedRanges = true;
+ }
if (!ShardLock) {
Ranges.emplace_back(range);
}
return !ShardLock;
}
-void TLockInfo::SetBroken(const TRowVersion& at) {
-#if 1 // optimisation: remove at next Remove
- if (!IsBroken(at))
- Locker->ScheduleLockCleanup(LockId, at);
-#endif
+bool TLockInfo::AddWriteLock(const TPathId& pathId) {
+ if (WriteTables.insert(pathId).second) {
+ UnpersistedRanges = true;
+ return true;
+ }
+ return false;
+}
- if (!BreakVersion || at < *BreakVersion)
- BreakVersion.emplace(at.Step, at.TxId);
+void TLockInfo::SetBroken(TRowVersion at) {
+ if (Persistent) {
+ // Persistent locks always break completely
+ at = TRowVersion::Min();
+ }
- if (at)
- return; // if break version is not TrowVersion::Min() we will postpone actual ranges removal
+ if (!IsBroken(at)) {
+ BreakVersion = at;
+ Locker->ScheduleRemoveBrokenRanges(LockId, at);
- Points.clear();
- Ranges.clear();
+ if (!at) {
+ // This lock is now broken in all versions, clear as soon as possible
+ Counter = Max<ui64>();
+ Points.clear();
+ Ranges.clear();
+ Locker->ScheduleBrokenLock(this);
+ }
+ }
+}
+
+void TLockInfo::PersistLock(ILocksDb* db) {
+ Y_VERIFY(!IsPersistent());
+ Y_VERIFY(db, "Cannot persist lock without a db");
+ db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds());
+ Persistent = true;
+
+ PersistRanges(db);
+ PersistConflicts(db);
+}
+
+void TLockInfo::PersistBrokenLock(ILocksDb* db) {
+ Y_VERIFY(IsPersistent());
+ Y_VERIFY(db, "Cannot persist lock without a db");
+ db->PersistLockCounter(LockId, Max<ui64>());
+}
+
+void TLockInfo::PersistRemoveLock(ILocksDb* db) {
+ Y_VERIFY(IsPersistent());
+ Y_VERIFY(db, "Cannot persist lock without a db");
+
+ // Remove persistent conflicts
+ for (auto& pr : ConflictLocks) {
+ TLockInfo* otherLock = pr.first;
+ if (otherLock->IsPersistent()) {
+ if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) {
+ db->PersistRemoveConflict(LockId, otherLock->LockId);
+ }
+ if (!!(pr.second & ELockConflictFlags::BreakUsOnTheirCommit)) {
+ db->PersistRemoveConflict(otherLock->LockId, LockId);
+ }
+ }
+ otherLock->ConflictLocks.erase(this);
+ }
+ ConflictLocks.clear();
+
+ // Remove persistent ranges
+ for (auto& range : PersistentRanges) {
+ db->PersistRemoveRange(LockId, range.Id);
+ }
+ PersistentRanges.clear();
+
+ // Remove the lock itself
+ db->PersistRemoveLock(LockId);
+}
+
+void TLockInfo::PersistRanges(ILocksDb* db) {
+ Y_VERIFY(IsPersistent());
+ if (UnpersistedRanges) {
+ for (const TPathId& pathId : ReadTables) {
+ PersistAddRange(pathId, ELockRangeFlags::Read, db);
+ }
+ for (const TPathId& pathId : WriteTables) {
+ PersistAddRange(pathId, ELockRangeFlags::Write, db);
+ }
+ UnpersistedRanges = false;
+ }
+}
+
+void TLockInfo::PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db) {
+ Y_VERIFY(IsPersistent());
+ Y_VERIFY(db, "Cannot persist ranges without a db");
+ // We usually have a single range with flags, so linear search is ok
+ ui64 maxId = 0;
+ for (auto& range : PersistentRanges) {
+ if (range.TableId == tableId) {
+ auto prevFlags = range.Flags;
+ range.Flags |= flags;
+ if (range.Flags != prevFlags) {
+ db->PersistRangeFlags(LockId, range.Id, ui64(range.Flags));
+ }
+ return;
+ }
+ maxId = Max(maxId, range.Id);
+ }
+ auto& range = PersistentRanges.emplace_back();
+ range.Id = maxId + 1;
+ range.TableId = tableId;
+ range.Flags = flags;
+ db->PersistAddRange(LockId, range.Id, range.TableId, ui64(range.Flags));
+}
+
+void TLockInfo::AddConflict(TLockInfo* otherLock, ILocksDb* db) {
+ Y_VERIFY(this != otherLock, "Lock cannot conflict with itself");
+ Y_VERIFY(LockId != otherLock->LockId, "Unexpected conflict between a pair of locks with the same id");
+
+ auto& flags = ConflictLocks[otherLock];
+ if (!(flags & ELockConflictFlags::BreakThemOnOurCommit)) {
+ flags |= ELockConflictFlags::BreakThemOnOurCommit;
+ auto& otherFlags = otherLock->ConflictLocks[this];
+ otherFlags |= ELockConflictFlags::BreakUsOnTheirCommit;
+ if (IsPersistent() && otherLock->IsPersistent()) {
+ // Any conflict between persistent locks is also persistent
+ Y_VERIFY(db, "Cannot persist conflicts without a db");
+ db->PersistAddConflict(LockId, otherLock->LockId);
+ }
+ }
+}
+
+void TLockInfo::PersistConflicts(ILocksDb* db) {
+ Y_VERIFY(IsPersistent());
+ Y_VERIFY(db, "Cannot persist conflicts without a db");
+ for (auto& pr : ConflictLocks) {
+ TLockInfo* otherLock = pr.first;
+ if (!otherLock->IsPersistent()) {
+ // We don't persist non-persistent conflicts
+ continue;
+ }
+ if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) {
+ db->PersistAddConflict(LockId, otherLock->LockId);
+ }
+ if (!!(pr.second & ELockConflictFlags::BreakUsOnTheirCommit)) {
+ db->PersistAddConflict(otherLock->LockId, LockId);
+ }
+ }
+}
+
+void TLockInfo::CleanupConflicts() {
+ if (IsPersistent()) {
+ for (auto it = ConflictLocks.begin(); it != ConflictLocks.end();) {
+ TLockInfo* otherLock = it->first;
+ if (otherLock->IsPersistent()) {
+ // We keep persistent conflict in memory until lock is removed
+ ++it;
+ } else {
+ otherLock->ConflictLocks.erase(this);
+ ConflictLocks.erase(it++);
+ }
+ }
+ } else {
+ for (auto& pr : ConflictLocks) {
+ TLockInfo* otherLock = pr.first;
+ otherLock->ConflictLocks.erase(this);
+ }
+ ConflictLocks.clear();
+ }
+}
+
+void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) {
+ auto& range = PersistentRanges.emplace_back();
+ range.Id = rangeId;
+ range.TableId = tableId;
+ range.Flags = flags;
+
+ if (!!(range.Flags & ELockRangeFlags::Read)) {
+ if (ReadTables.insert(range.TableId).second) {
+ ShardLock = true;
+ if (auto* table = Locker->FindTablePtr(range.TableId)) {
+ table->AddShardLock(this);
+ }
+ }
+ }
+ if (!!(range.Flags & ELockRangeFlags::Write)) {
+ if (WriteTables.insert(range.TableId).second) {
+ if (auto* table = Locker->FindTablePtr(range.TableId)) {
+ table->AddWriteLock(this);
+ }
+ }
+ }
+}
+
+void TLockInfo::RestorePersistentConflict(TLockInfo* otherLock) {
+ Y_VERIFY(IsPersistent() && otherLock->IsPersistent());
+
+ this->ConflictLocks[otherLock] |= ELockConflictFlags::BreakThemOnOurCommit;
+ otherLock->ConflictLocks[this] |= ELockConflictFlags::BreakUsOnTheirCommit;
}
// TTableLocks
-void TTableLocks::AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lock) {
+void TTableLocks::AddShardLock(TLockInfo* lock) {
+ ShardLocks.insert(lock);
+}
+
+void TTableLocks::AddPointLock(const TPointKey& point, TLockInfo* lock) {
Y_VERIFY(lock->MayHavePointsAndRanges());
Y_VERIFY(point.Table == this);
TRangeTreeBase::TOwnedRange added(
@@ -71,10 +287,10 @@ void TTableLocks::AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lo
true,
point.Key,
true);
- Ranges.AddRange(std::move(added), lock.Get());
+ Ranges.AddRange(std::move(added), lock);
}
-void TTableLocks::AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lock) {
+void TTableLocks::AddRangeLock(const TRangeKey& range, TLockInfo* lock) {
Y_VERIFY(lock->MayHavePointsAndRanges());
Y_VERIFY(range.Table == this);
// FIXME: we have to force empty From/To to be inclusive due to outdated
@@ -88,60 +304,100 @@ void TTableLocks::AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lo
range.InclusiveFrom || !range.From,
range.To,
range.InclusiveTo || !range.To);
- Ranges.AddRange(std::move(added), lock.Get());
+ Ranges.AddRange(std::move(added), lock);
}
-void TTableLocks::RemoveLock(const TLockInfo::TPtr& lock) {
- Ranges.RemoveRanges(lock.Get());
+void TTableLocks::AddWriteLock(TLockInfo* lock) {
+ WriteLocks.insert(lock);
}
-void TTableLocks::BreakLocks(TConstArrayRef<TCell> key, const TRowVersion& at) {
- Ranges.EachIntersection(key, [&](const TRangeTreeBase::TRange&, TLockInfo* lock) {
+void TTableLocks::RemoveReadLock(TLockInfo* lock) {
+ if (lock->IsShardLock()) {
+ RemoveShardLock(lock);
+ } else {
+ RemoveRangeLock(lock);
+ }
+}
+
+void TTableLocks::RemoveShardLock(TLockInfo* lock) {
+ ShardLocks.erase(lock);
+}
+
+void TTableLocks::RemoveRangeLock(TLockInfo* lock) {
+ Ranges.RemoveRanges(lock);
+}
+
+void TTableLocks::RemoveWriteLock(TLockInfo* lock) {
+ WriteLocks.erase(lock);
+}
+
+bool TTableLocks::BreakShardLocks(const TRowVersion& at) {
+ bool broken = false;
+ for (TLockInfo* lock : ShardLocks) {
lock->SetBroken(at);
- });
+ broken = true;
+ }
+ return broken;
}
-void TTableLocks::BreakAllLocks(const TRowVersion& at) {
+bool TTableLocks::BreakAllLocks(const TRowVersion& at) {
+ bool broken = false;
+ for (TLockInfo* lock : ShardLocks) {
+ lock->SetBroken(at);
+ broken = true;
+ }
Ranges.EachRange([&](const TRangeTreeBase::TRange&, TLockInfo* lock) {
lock->SetBroken(at);
+ broken = true;
});
+ return broken;
}
// TLockLocker
-TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockTxId, lockNodeId);
- if (!lock || lock->IsBroken(at))
- return lock;
-
- ShardLocks.insert(lockTxId);
- for (const TPathId& tableId : lock->GetAffectedTables()) {
- Tables.at(tableId)->RemoveLock(lock);
+void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) {
+ if (lock->AddPoint(key)) {
+ key.Table->AddPointLock(key, lock.Get());
+ } else {
+ key.Table->AddShardLock(lock.Get());
}
- lock->AddShardLock(affectedTables);
- return lock;
}
-TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, ui32 lockNodeId, const TPointKey& point, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId);
- if (!lock || lock->IsBroken(at))
- return lock;
-
- if (lock->AddPoint(point)) {
- point.Table->AddPointLock(point, lock);
+void TLockLocker::AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key) {
+ if (lock->AddRange(key)) {
+ key.Table->AddRangeLock(key, lock.Get());
+ } else {
+ key.Table->AddShardLock(lock.Get());
}
- return lock;
}
-TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TRangeKey& range, const TRowVersion& at) {
- TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId);
- if (!lock || lock->IsBroken(at))
- return lock;
+void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) {
+ if (!lock->IsShardLock()) {
+ for (const TPathId& tableId : lock->GetReadTables()) {
+ Tables.at(tableId)->RemoveRangeLock(lock.Get());
+ }
+ lock->MakeShardLock();
+ for (const TPathId& tableId : lock->GetReadTables()) {
+ Tables.at(tableId)->AddShardLock(lock.Get());
+ }
+ }
+ for (auto& table : readTables) {
+ const TPathId& tableId = table.GetTableId();
+ Y_VERIFY(Tables.at(tableId).Get() == &table);
+ if (lock->AddShardLock(tableId)) {
+ table.AddShardLock(lock.Get());
+ }
+ }
+}
- if (lock->AddRange(range)) {
- range.Table->AddRangeLock(range, lock);
+void TLockLocker::AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables) {
+ for (auto& table : writeTables) {
+ const TPathId& tableId = table.GetTableId();
+ Y_VERIFY(Tables.at(tableId).Get() == &table);
+ if (lock->AddWriteLock(tableId)) {
+ table.AddWriteLock(lock.Get());
+ }
}
- return lock;
}
TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const {
@@ -154,30 +410,28 @@ TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const
return nullptr;
}
-void TLockLocker::BreakShardLocks(const TRowVersion& at) {
- for (ui64 lockId : ShardLocks) {
- auto it = Locks.find(lockId);
- if (it != Locks.end()) {
- it->second->SetBroken(at);
- }
+void TLockLocker::BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at) {
+ for (auto& lock : locks) {
+ lock.SetBroken(at);
}
- if (!at)
- ShardLocks.clear();
+
RemoveBrokenRanges();
}
-void TLockLocker::BreakLocks(const TPointKey& point, const TRowVersion& at) {
- point.Table->BreakLocks(point.Key, at);
+void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at) {
+ for (auto& table : tables) {
+ table.BreakShardLocks(at);
+ }
+
RemoveBrokenRanges();
}
-void TLockLocker::BreakAllLocks(const TPathId& pathId, const TRowVersion& at) {
- auto it = Tables.find(pathId);
- if (it != Tables.end()) {
- it->second->BreakAllLocks(at);
- RemoveBrokenRanges();
-
+void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at) {
+ for (auto& table : tables) {
+ table.BreakAllLocks(at);
}
+
+ RemoveBrokenRanges();
}
void TLockLocker::RemoveBrokenRanges() {
@@ -186,13 +440,13 @@ void TLockLocker::RemoveBrokenRanges() {
if (it != Locks.end()) {
const TLockInfo::TPtr& lock = it->second;
- if (!lock->IsShardLock()) {
- for (const TPathId& tableId : lock->GetAffectedTables()) {
- Tables.at(tableId)->RemoveLock(lock);
- }
- } else {
- ShardLocks.erase(lockId);
+ for (const TPathId& tableId : lock->GetReadTables()) {
+ Tables.at(tableId)->RemoveReadLock(lock.Get());
}
+ for (const TPathId& tableId : lock->GetWriteTables()) {
+ Tables.at(tableId)->RemoveWriteLock(lock.Get());
+ }
+ lock->CleanupConflicts();
}
}
CleanupPending.clear();
@@ -209,17 +463,24 @@ void TLockLocker::RemoveBrokenRanges() {
if (it != Locks.end()) {
const TLockInfo::TPtr& lock = it->second;
+ if (lock->Counter == Max<ui64>()) {
+ // Skip locks that have been cleaned up already
+ continue;
+ }
+
lock->BreakVersion = TRowVersion::Min();
+ lock->Counter = Max<ui64>();
lock->Points.clear();
lock->Ranges.clear();
+ ScheduleBrokenLock(lock.Get());
- if (!lock->IsShardLock()) {
- for (const TPathId& tableId : lock->GetAffectedTables()) {
- Tables.at(tableId)->RemoveLock(lock);
- }
- } else {
- ShardLocks.erase(lockId);
+ for (const TPathId& tableId : lock->GetReadTables()) {
+ Tables.at(tableId)->RemoveReadLock(lock.Get());
+ }
+ for (const TPathId& tableId : lock->GetWriteTables()) {
+ Tables.at(tableId)->RemoveWriteLock(lock.Get());
}
+ lock->CleanupConflicts();
}
}
}
@@ -246,7 +507,18 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) {
return lock;
}
-void TLockLocker::RemoveOneLock(ui64 lockTxId) {
+TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
+ Y_VERIFY(Locks.find(lockId) == Locks.end());
+
+ TLockInfo::TPtr lock = Limiter.AddLock(lockId, lockNodeId, generation, counter, createTs);
+ Locks[lockId] = lock;
+ if (lockNodeId) {
+ PendingSubscribeLocks.emplace_back(lockId, lockNodeId);
+ }
+ return lock;
+}
+
+void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) {
auto it = Locks.find(lockTxId);
if (it != Locks.end()) {
TLockInfo::TPtr txLock = it->second;
@@ -257,44 +529,48 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId) {
Self->IncCounter(COUNTER_LOCKS_REMOVED);
}
- if (!txLock->IsShardLock()) {
- for (const TPathId& tableId : txLock->GetAffectedTables()) {
- Tables.at(tableId)->RemoveLock(txLock);
- }
- } else {
- ShardLocks.erase(lockTxId);
+ if (txLock->InBrokenLocks) {
+ BrokenLocks.Remove(txLock.Get());
+ --BrokenLocksCount_;
+ }
+
+ for (const TPathId& tableId : txLock->GetReadTables()) {
+ Tables.at(tableId)->RemoveReadLock(txLock.Get());
}
+ for (const TPathId& tableId : txLock->GetWriteTables()) {
+ Tables.at(tableId)->RemoveWriteLock(txLock.Get());
+ }
+ txLock->CleanupConflicts();
Limiter.RemoveLock(lockTxId);
Locks.erase(it);
+ if (txLock->IsPersistent()) {
+ if (db) {
+ txLock->PersistRemoveLock(db);
+ } else {
+ Y_VERIFY(txLock->IsBroken(), "Scheduling persistent lock removal that is not broken");
+ RemovedPersistentLocks.push_back(txLock);
+ }
+ }
}
}
void TLockLocker::RemoveBrokenLocks() {
- for (ui64 lockId : BrokenLocks) {
- RemoveOneLock(lockId);
- }
- BrokenLocks.clear();
-
- if (BrokenCandidates.empty())
- return;
-
- auto till = Self->LastCompleteTxVersion();
- while (!BrokenCandidates.empty() && BrokenCandidates.top().Version <= till) {
- RemoveOneLock(BrokenCandidates.top().LockId);
- BrokenCandidates.pop();
+ RemoveBrokenRanges();
+ while (BrokenLocks) {
+ auto* lock = BrokenLocks.PopFront();
+ RemoveOneLock(lock->GetLockId());
}
}
-void TLockLocker::BreakLock(ui64 lockId, const TRowVersion& at) {
- if (auto lock = GetLock(lockId, at))
- lock->SetBroken(at);
-
- RemoveBrokenLocks();
+void TLockLocker::ForceBreakLock(ui64 lockId) {
+ if (auto lock = GetLock(lockId, TRowVersion::Min())) {
+ lock->SetBroken(TRowVersion::Min());
+ RemoveBrokenRanges();
+ }
}
-void TLockLocker::RemoveLock(ui64 lockId) {
- RemoveBrokenLocks();
- RemoveOneLock(lockId);
+void TLockLocker::RemoveLock(ui64 lockId, ILocksDb* db) {
+ RemoveOneLock(lockId, db);
}
void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo) {
@@ -309,29 +585,49 @@ void TLockLocker::RemoveSchema(const TPathId& tableId) {
Y_VERIFY(Tables.empty());
Locks.clear();
ShardLocks.clear();
- BrokenLocks.clear();
+ BrokenLocks.Clear();
CleanupPending.clear();
- BrokenCandidates.clear();
CleanupCandidates.clear();
+ PendingSubscribeLocks.clear();
+ RemovedPersistentLocks.clear();
+ Limiter.Clear();
}
+bool TLockLocker::ForceShardLock(const TPathId& tableId) const {
+ auto it = Tables.find(tableId);
+ if (it != Tables.end()) {
+ if (it->second->RangeCount() > TLockLimiter::LockLimit()) {
+ return true;
+ }
+ }
+ return false;
+}
-bool TLockLocker::ForceShardLock(const THashSet<TPathId>& rangeTables) const {
- for (const TPathId& tableId : rangeTables) {
- auto it = Tables.find(tableId);
- Y_VERIFY(it != Tables.end());
- if (it->second->RangeCount() > TLockLimiter::LockLimit())
+bool TLockLocker::ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const {
+ for (auto& table : readTables) {
+ if (table.RangeCount() > TLockLimiter::LockLimit())
return true;
}
return false;
}
-void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) {
+void TLockLocker::ScheduleBrokenLock(TLockInfo* lock) {
+ auto it = Locks.find(lock->GetLockId());
+ Y_VERIFY(it != Locks.end() && it->second.Get() == lock,
+ "Sanity check: adding an unknown broken lock");
+ if (lock->IsPersistent()) {
+ BrokenPersistentLocks.PushBack(lock);
+ } else if (!lock->InBrokenLocks) {
+ BrokenLocks.PushBack(lock);
+ ++BrokenLocksCount_;
+ lock->InBrokenLocks = true;
+ }
+}
+
+void TLockLocker::ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at) {
if (at) {
- BrokenCandidates.emplace(lockId, at);
CleanupCandidates.emplace(lockId, at);
} else {
- BrokenLocks.push_back(lockId);
CleanupPending.push_back(lockId);
}
@@ -340,8 +636,15 @@ void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) {
}
}
-void TLockLocker::RemoveSubscribedLock(ui64 lockId) {
- RemoveLock(lockId);
+void TLockLocker::RemoveSubscribedLock(ui64 lockId, ILocksDb* db) {
+ RemoveLock(lockId, db);
+}
+
+void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) {
+ while (BrokenPersistentLocks) {
+ TLockInfo* lock = BrokenPersistentLocks.PopFront();
+ lock->PersistBrokenLock(db);
+ }
}
// TLockLocker.TLockLimiter
@@ -379,6 +682,15 @@ void TLockLocker::TLockLimiter::TouchLock(ui64 lockId) {
LocksQueue.Find(lockId);
}
+TLockInfo::TPtr TLockLocker::TLockLimiter::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) {
+ LocksQueue.Insert(lockId, createTs);
+ return TLockInfo::TPtr(new TLockInfo(Parent, lockId, lockNodeId, generation, counter, createTs));
+}
+
+void TLockLocker::TLockLimiter::Clear() {
+ LocksQueue.Clear();
+}
+
// TSysLocks
TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
@@ -386,101 +698,132 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
TMicrosecTimerCounter measureApplyLocks(*Self, COUNTER_APPLY_LOCKS_USEC);
- auto &checkVersion = Update->CheckVersion;
- auto &breakVersion = Update->BreakVersion;
+ // Note: we don't use CheckVersion here, because ApplyLocks is all about
+ // setting locks, not validating them. If the lock is broken in any
+ // version, then extending it is pointless: validation would be at
+ // some point in the future, where it is broken already.
+ TRowVersion breakVersion = Update->BreakVersion;
+
+ // TODO: move this somewhere earlier, like the start of a new update guard
+ Locker.RemoveBrokenRanges();
- if (Update->ShardBreak) {
- Locker.BreakShardLocks(breakVersion);
+ if (Update->BreakLocks) {
+ Locker.BreakLocks(Update->BreakLocks, breakVersion);
}
- if (Update->PointBreaks.size()) {
- for (const auto& key : Update->PointBreaks) {
- Locker.BreakLocks(key, breakVersion);
- }
+ if (Update->BreakShardLocks) {
+ Locker.BreakLocks(Update->BreakShardLocks, breakVersion);
}
- if (Update->AllBreaks.size()) {
- for (const auto& pathId : Update->AllBreaks) {
- Locker.BreakAllLocks(pathId, breakVersion);
- }
+ if (Update->BreakAllLocks) {
+ Locker.BreakLocks(Update->BreakAllLocks, breakVersion);
}
- if (!Update->Erases.empty() && Self->TabletCounters) {
- Self->IncCounter(COUNTER_LOCKS_ERASED, Update->Erases.size());
+ Locker.SaveBrokenPersistentLocks(Db);
+
+ // Merge shard lock conflicts into write conflicts, we do this once as an optimization
+ for (auto& table : Update->WriteConflictShardLocks) {
+ for (auto* lock : table.ShardLocks) {
+ if (lock->GetLockId() != Update->LockTxId) {
+ Update->WriteConflictLocks.PushBack(lock);
+ }
+ }
}
- for (ui64 lockId : Update->Erases) {
+ size_t erases = 0;
+ while (Update->EraseLocks) {
Y_VERIFY(!Update->HasLocks(), "Can't erase and set locks in one Tx");
- if (breakVersion)
- Locker.BreakLock(lockId, breakVersion);
- else
- Locker.RemoveLock(lockId);
+ auto* lock = Update->EraseLocks.PopFront();
+ Locker.RemoveLock(lock->GetLockId(), Db);
+ ++erases;
+ }
+
+ if (erases > 0 && Self->TabletCounters) {
+ Self->IncCounter(COUNTER_LOCKS_ERASED, erases);
}
- if (!Update->HasLocks())
+ if (!Update->HasLocks()) {
+ // Adding read/write conflicts implies locking
+ Y_VERIFY(!Update->ReadConflictLocks);
+ Y_VERIFY(!Update->WriteConflictLocks);
return TVector<TLock>();
+ }
- if (Locker.ForceShardLock(Update->RangeTables))
- Update->ShardLock = true;
+ bool shardLock = Locker.ForceShardLock(Update->ReadTables);
+ TLockInfo::TPtr lock;
ui64 counter = TLock::ErrorNotSet;
- ui32 numNotSet = 0;
+
if (Update->BreakOwn) {
counter = TLock::ErrorAlreadyBroken;
- } else if (Update->ShardLock) {
- TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->LockNodeId, Update->AffectedTables, checkVersion);
- if (lock) {
- Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
- counter = lock->GetCounter(checkVersion);
- } else {
- ++numNotSet;
- }
+ Locker.ForceBreakLock(Update->LockTxId);
+ Locker.SaveBrokenPersistentLocks(Db);
} else {
- for (const auto& key : Update->PointLocks) {
- TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, Update->LockNodeId, key, checkVersion);
- if (lock) {
- Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
- counter = lock->GetCounter(checkVersion);
+ lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId);
+ if (!lock) {
+ counter = TLock::ErrorTooMuch;
+ } else if (lock->IsBroken()) {
+ counter = TLock::ErrorBroken;
+ } else {
+ if (shardLock) {
+ Locker.AddShardLock(lock, Update->ReadTables);
+ if (Self->TabletCounters) {
+ Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
+ }
} else {
- ++numNotSet;
+ for (const auto& key : Update->PointLocks) {
+ Locker.AddPointLock(lock, key);
+ }
+ for (const auto& key : Update->RangeLocks) {
+ Locker.AddRangeLock(lock, key);
+ }
}
- }
+ if (Update->WriteTables) {
+ Locker.AddWriteLock(lock, Update->WriteTables);
+ }
+ counter = lock->GetCounter();
+ Update->Lock = lock;
- for (const auto& key : Update->RangeLocks) {
- TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, Update->LockNodeId, key, checkVersion);
- if (lock) {
- Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter));
- counter = lock->GetCounter(checkVersion);
- } else {
- ++numNotSet;
+ if (lock->IsPersistent()) {
+ lock->PersistRanges(Db);
+ }
+ for (auto& readConflictLock : Update->ReadConflictLocks) {
+ readConflictLock.AddConflict(lock.Get(), Db);
+ }
+ for (auto& writeConflictLock : Update->WriteConflictLocks) {
+ lock->AddConflict(&writeConflictLock, Db);
}
- }
- }
- if (numNotSet) {
- counter = TLock::ErrorTooMuch;
+ if (lock->GetWriteTables() && !lock->IsPersistent()) {
+ // We need to persist a new lock
+ lock->PersistLock(Db);
+ }
+ }
}
UpdateCounters(counter);
// We have to tell client that there were some locks (even if we don't set them)
TVector<TLock> out;
- out.reserve(Update->AffectedTables.size());
- for (const TPathId& pathId : Update->AffectedTables) {
- out.emplace_back(MakeLock(Update->LockTxId, counter, pathId));
+ for (auto& table : Update->AffectedTables) {
+ out.emplace_back(MakeLock(Update->LockTxId, lock ? lock->GetGeneration() : Self->Generation(), counter, table.GetTableId()));
}
return out;
}
-void TSysLocks::UpdateCounters(ui64 counter) {
+void TSysLocks::UpdateCounters() {
if (!Self->TabletCounters)
return;
Self->IncCounter(COUNTER_LOCKS_ACTIVE_PER_SHARD, LocksCount());
Self->IncCounter(COUNTER_LOCKS_BROKEN_PER_SHARD, BrokenLocksCount());
- if (Update && Update->ShardLock) {
- Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD);
- }
+}
+
+void TSysLocks::UpdateCounters(ui64 counter) {
+ if (!Self->TabletCounters)
+ return;
+
+ UpdateCounters();
if (TLock::IsError(counter)) {
if (TLock::IsBroken(counter)) {
@@ -519,17 +862,17 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
auto &checkVersion = Update->CheckVersion;
TLockInfo::TPtr txLock = Locker.GetLock(lockTxId, checkVersion);
if (txLock) {
- const auto& tableIds = txLock->GetAffectedTables();
+ const auto& tableIds = txLock->GetReadTables();
if (key.size() == 2) { // locks v1
Y_VERIFY(tableIds.size() == 1);
- return MakeAndLogLock(lockTxId, txLock->GetCounter(checkVersion), *tableIds.begin());
+ return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), *tableIds.begin());
} else { // locks v2
Y_VERIFY(key.size() == 4);
TPathId tableId;
ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::SchemeShard, tableId.OwnerId);
ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::PathId, tableId.LocalPathId);
if (ok && tableId && tableIds.contains(tableId))
- return MakeAndLogLock(lockTxId, txLock->GetCounter(checkVersion), tableId);
+ return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId);
}
}
@@ -539,7 +882,9 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) {
Y_VERIFY(Update);
- Update->EraseLock(GetLockId(key));
+ if (auto* lock = Locker.FindLockPtr(GetLockId(key))) {
+ Update->AddEraseLock(lock);
+ }
}
void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
@@ -549,7 +894,7 @@ void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& k
if (lockTxId) {
Y_VERIFY(Update);
- Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId, lockNodeId);
+ Update->AddPointLock(Locker.MakePoint(tableId, key), lockTxId, lockNodeId);
}
}
@@ -565,18 +910,82 @@ void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64
if (lockTxId) {
Y_VERIFY(Update);
- Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId, lockNodeId);
+ Update->AddRangeLock(Locker.MakeRange(tableId, range), lockTxId, lockNodeId);
}
}
-void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) {
- Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks)));
+void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
+ Y_VERIFY(!TSysTables::IsSystemTable(tableId));
if (!Self->IsUserTable(tableId))
return;
- if (Locker.TableHasLocks(tableId))
- Update->BreakLocks(Locker.MakePoint(tableId, key));
- Update->BreakShardLock();
+ if (auto* table = Locker.FindTablePtr(tableId)) {
+ Update->AddWriteLock(table, lockTxId, lockNodeId);
+ AddWriteConflict(tableId, key, lockTxId, lockNodeId);
+ }
+}
+
+void TSysLocks::BreakLock(ui64 lockId) {
+ if (auto* lock = Locker.FindLockPtr(lockId)) {
+ Update->AddBreakLock(lock);
+ }
+}
+
+void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+ Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks)));
+
+ if (auto* table = Locker.FindTablePtr(tableId)) {
+ if (table->HasRangeLocks()) {
+ // Note: avoid copying the key, find all locks here
+ table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) {
+ update->AddBreakLock(lock);
+ });
+ }
+ if (table->HasShardLocks()) {
+ // We also want to break all shard locks in this table
+ Update->AddBreakShardLocks(table);
+ }
+ }
+}
+
+void TSysLocks::AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) {
+ Y_UNUSED(lockNodeId);
+
+ if (conflictId != lockTxId) {
+ if (auto* lock = Locker.FindLockPtr(conflictId)) {
+ Update->AddReadConflictLock(lock);
+ }
+ }
+}
+
+void TSysLocks::AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) {
+ Y_UNUSED(lockNodeId);
+
+ if (conflictId != lockTxId) {
+ if (auto* lock = Locker.FindLockPtr(conflictId)) {
+ Update->AddWriteConflictLock(lock);
+ }
+ }
+}
+
+void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) {
+ Y_UNUSED(lockTxId);
+ Y_UNUSED(lockNodeId);
+
+ if (auto* table = Locker.FindTablePtr(tableId)) {
+ if (table->HasRangeLocks()) {
+ // Note: avoid copying the key, find all locks here
+ table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) {
+ if (lock->GetLockId() != update->LockTxId) {
+ update->AddWriteConflictLock(lock);
+ }
+ });
+ }
+ if (table->HasShardLocks()) {
+ // We also want to conflict with all shard locks in this table
+ Update->AddWriteConflictShardLocks(table);
+ }
+ }
}
void TSysLocks::BreakAllLocks(const TTableId& tableId) {
@@ -584,9 +993,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) {
if (!Self->IsUserTable(tableId))
return;
- if (Locker.TableHasLocks(tableId))
- Update->BreakAllLocks(tableId);
- Update->BreakShardLock();
+ if (auto* table = Locker.FindTablePtr(tableId)) {
+ if (table->HasRangeLocks() || table->HasShardLocks()) {
+ Update->AddBreakAllLocks(table);
+ }
+ }
}
void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) {
@@ -602,23 +1013,77 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const {
return ok && (Self->TabletID() == tabletId);
}
-TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const {
+bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const {
+ if (auto* lock = Locker.FindLockPtr(lockId)) {
+ return lock->WriteTables.contains(tableId.PathId);
+ }
+
+ return false;
+}
+
+bool TSysLocks::HasWriteLocks(const TTableId& tableId) const {
+ if (auto* table = Locker.FindTablePtr(tableId.PathId)) {
+ return !table->WriteLocks.empty();
+ }
+
+ return false;
+}
+
+bool TSysLocks::MayAddLock(ui64 lockId) const {
+ if (auto* lock = Locker.FindLockPtr(lockId)) {
+ // We may expand the lock unless it's broken
+ return !lock->IsBroken();
+ }
+
+ Y_VERIFY(Db, "MayAddLock needs a valid locks database");
+ return Db->MayAddLock(lockId);
+}
+
+TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const {
TLock lock;
lock.LockId = lockTxId;
lock.DataShard = Self->TabletID();
- lock.Generation = Self->Generation();
+ lock.Generation = generation;
lock.Counter = counter;
lock.SchemeShard = pathId.OwnerId;
lock.PathId = pathId.LocalPathId;
return lock;
}
-TSysLocks::TLock TSysLocks::MakeAndLogLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const {
- TLock lock = MakeLock(lockTxId, counter, pathId);
+TSysLocks::TLock TSysLocks::MakeAndLogLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const {
+ TLock lock = MakeLock(lockTxId, generation, counter, pathId);
if (AccessLog)
AccessLog->Locks[lockTxId] = lock;
return lock;
}
+bool TSysLocks::Load(ILocksDb& db) {
+ TVector<ILocksDb::TLockRow> rows;
+ if (!db.Load(rows)) {
+ return false;
+ }
+
+ Locker.Clear();
+
+ for (auto& lockRow : rows) {
+ TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs));
+ for (auto& rangeRow : lockRow.Ranges) {
+ lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags));
+ }
+ }
+
+ for (auto& lockRow : rows) {
+ auto* lock = Locker.FindLockPtr(lockRow.LockId);
+ Y_VERIFY(lock);
+ for (ui64 conflictId : lockRow.Conflicts) {
+ if (auto* otherLock = Locker.FindLockPtr(conflictId)) {
+ lock->RestorePersistentConflict(otherLock);
+ }
+ }
+ }
+
+ return true;
+}
+
}}
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 8217e6cbbde..6cb67bfe8cc 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -20,6 +20,53 @@ namespace NDataShard {
struct TUserTable;
+class ILocksDb {
+protected:
+ ~ILocksDb() = default;
+
+public:
+ struct TLockRange {
+ ui64 RangeId;
+ TPathId TableId;
+ ui64 Flags;
+ TString Data;
+ };
+
+ struct TLockRow {
+ ui64 LockId;
+ ui32 LockNodeId;
+ ui32 Generation;
+ ui64 Counter;
+ ui64 CreateTs;
+ ui64 Flags;
+
+ TVector<TLockRange> Ranges;
+ TVector<ui64> Conflicts;
+ };
+
+ virtual bool Load(TVector<TLockRow>& rows) = 0;
+
+ // Returns true when a new lock may be added with the given lockId
+ // Sometimes new lock cannot be added, e.g. when it had uncommitted changes
+ // in the past, and adding anything with the same lockId would conflict
+ // with previous decisions.
+ virtual bool MayAddLock(ui64 lockId) = 0;
+
+ // Persist adding/removing a lock info
+ virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0;
+ virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0;
+ virtual void PersistRemoveLock(ui64 lockId) = 0;
+
+ // Persist adding/removing info on locked ranges
+ virtual void PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags = 0, const TString& data = {}) = 0;
+ virtual void PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) = 0;
+ virtual void PersistRemoveRange(ui64 lockId, ui64 rangeId) = 0;
+
+ // Persist a conflict, i.e. this lock must break some other lock on commit
+ virtual void PersistAddConflict(ui64 lockId, ui64 otherLockId) = 0;
+ virtual void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) = 0;
+};
+
class TLocksDataShard {
public:
TLocksDataShard(TTabletCountersBase* const &tabletCounters)
@@ -99,6 +146,7 @@ private:
class TLockInfo;
class TTableLocks;
class TLockLocker;
+class TSysLocks;
///
struct TPointKey {
@@ -152,23 +200,73 @@ struct TPendingSubscribeLock {
}
};
+enum class ELockConflictFlags : ui8 {
+ None = 0,
+ BreakThemOnOurCommit = 1,
+ BreakUsOnTheirCommit = 2,
+};
+
+using ELockConflictFlagsRaw = std::underlying_type<ELockConflictFlags>::type;
+
+inline ELockConflictFlags operator|(ELockConflictFlags a, ELockConflictFlags b) { return ELockConflictFlags(ELockConflictFlagsRaw(a) | ELockConflictFlagsRaw(b)); }
+inline ELockConflictFlags operator&(ELockConflictFlags a, ELockConflictFlags b) { return ELockConflictFlags(ELockConflictFlagsRaw(a) & ELockConflictFlagsRaw(b)); }
+inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a | b; }
+inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; }
+inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; }
+
+enum class ELockRangeFlags : ui8 {
+ None = 0,
+ Read = 1,
+ Write = 2,
+};
+
+using ELockRangeFlagsRaw = std::underlying_type<ELockRangeFlags>::type;
+
+inline ELockRangeFlags operator|(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
+inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
+inline ELockRangeFlags& operator|=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a | b; }
+inline ELockRangeFlags& operator&=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a & b; }
+inline bool operator!(ELockRangeFlags c) { return ELockRangeFlagsRaw(c) == 0; }
+
+// Tags for various intrusive lists
+struct TLockInfoBreakListTag {};
+struct TLockInfoEraseListTag {};
+struct TLockInfoReadConflictListTag {};
+struct TLockInfoWriteConflictListTag {};
+struct TLockInfoBrokenListTag {};
+struct TLockInfoBrokenPersistentListTag {};
+
/// Aggregates shard, point and range locks
-class TLockInfo : public TSimpleRefCount<TLockInfo> {
+class TLockInfo
+ : public TSimpleRefCount<TLockInfo>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoBreakListTag>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoEraseListTag>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoReadConflictListTag>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoWriteConflictListTag>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoBrokenListTag>
+ , public TIntrusiveListItem<TLockInfo, TLockInfoBrokenPersistentListTag>
+{
friend class TTableLocks;
friend class TLockLocker;
+ friend class TSysLocks;
public:
using TPtr = TIntrusivePtr<TLockInfo>;
TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId);
+ TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
~TLockInfo();
+ ui32 GetGeneration() const { return Generation; }
ui64 GetCounter(const TRowVersion& at = TRowVersion::Max()) const { return !BreakVersion || at < *BreakVersion ? Counter : Max<ui64>(); }
bool IsBroken(const TRowVersion& at = TRowVersion::Max()) const { return GetCounter(at) == Max<ui64>(); }
size_t NumPoints() const { return Points.size(); }
size_t NumRanges() const { return Ranges.size(); }
bool IsShardLock() const { return ShardLock; }
+ bool IsWriteLock() const { return !WriteTables.empty(); }
+ bool IsPersistent() const { return Persistent; }
+ bool HasUnpersistedRanges() const { return UnpersistedRanges; }
//ui64 MemorySize() const { return 1; } // TODO
bool MayHavePointsAndRanges() const { return !ShardLock && (!BreakVersion || *BreakVersion); }
@@ -177,33 +275,84 @@ public:
ui32 GetLockNodeId() const { return LockNodeId; }
TInstant GetCreationTime() const { return CreationTime; }
- const THashSet<TPathId>& GetAffectedTables() const { return AffectedTables; }
+ const THashSet<TPathId>& GetReadTables() const { return ReadTables; }
+ const THashSet<TPathId>& GetWriteTables() const { return WriteTables; }
const TVector<TPointKey>& GetPoints() const { return Points; }
const TVector<TRangeKey>& GetRanges() const { return Ranges; }
+ void PersistLock(ILocksDb* db);
+ void PersistBrokenLock(ILocksDb* db);
+ void PersistRemoveLock(ILocksDb* db);
+
+ void PersistRanges(ILocksDb* db);
+
+ void AddConflict(TLockInfo* otherLock, ILocksDb* db);
+ void PersistConflicts(ILocksDb* db);
+ void CleanupConflicts();
+
+ void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
+ void RestorePersistentConflict(TLockInfo* otherLock);
+
private:
- void AddShardLock(const THashSet<TPathId>& affectedTables);
+ void MakeShardLock();
+ bool AddShardLock(const TPathId& pathId);
bool AddPoint(const TPointKey& point);
bool AddRange(const TRangeKey& range);
- void SetBroken(const TRowVersion& at);
+ bool AddWriteLock(const TPathId& pathId);
+ void SetBroken(TRowVersion at);
+
+ void PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db);
+
+private:
+ struct TPersistentRange {
+ ui64 Id;
+ TPathId TableId;
+ ELockRangeFlags Flags;
+ };
private:
TLockLocker * Locker;
ui64 LockId;
ui32 LockNodeId;
+ ui32 Generation;
ui64 Counter;
TInstant CreationTime;
- THashSet<TPathId> AffectedTables;
+ THashSet<TPathId> ReadTables;
+ THashSet<TPathId> WriteTables;
TVector<TPointKey> Points;
TVector<TRangeKey> Ranges;
bool ShardLock = false;
+ bool Persistent = false;
+ bool UnpersistedRanges = false;
+ bool InBrokenLocks = false;
std::optional<TRowVersion> BreakVersion;
+
+ // A set of locks we must break on commit
+ THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks;
+ TVector<TPersistentRange> PersistentRanges;
};
+struct TTableLocksReadListTag {};
+struct TTableLocksWriteListTag {};
+struct TTableLocksAffectedListTag {};
+struct TTableLocksBreakShardListTag {};
+struct TTableLocksBreakAllListTag {};
+struct TTableLocksWriteConflictShardListTag {};
+
///
-class TTableLocks : public TSimpleRefCount<TTableLocks> {
+class TTableLocks
+ : public TSimpleRefCount<TTableLocks>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksReadListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksWriteListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksAffectedListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksBreakShardListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksBreakAllListTag>
+ , public TIntrusiveListItem<TTableLocks, TTableLocksWriteConflictShardListTag>
+{
+ friend class TSysLocks;
+
public:
using TPtr = TIntrusivePtr<TTableLocks>;
@@ -215,11 +364,16 @@ public:
TPathId GetTableId() const { return TableId; }
- void AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lock);
- void AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lock);
- void RemoveLock(const TLockInfo::TPtr& lock);
- void BreakLocks(TConstArrayRef<TCell> key, const TRowVersion& at);
- void BreakAllLocks(const TRowVersion& at);
+ void AddShardLock(TLockInfo* lock);
+ void AddPointLock(const TPointKey& point, TLockInfo* lock);
+ void AddRangeLock(const TRangeKey& range, TLockInfo* lock);
+ void AddWriteLock(TLockInfo* lock);
+ void RemoveReadLock(TLockInfo* lock);
+ void RemoveShardLock(TLockInfo* lock);
+ void RemoveRangeLock(TLockInfo* lock);
+ void RemoveWriteLock(TLockInfo* lock);
+ bool BreakShardLocks(const TRowVersion& at);
+ bool BreakAllLocks(const TRowVersion& at);
ui64 NumKeyColumns() const {
return KeyColumnTypes.size();
@@ -238,21 +392,28 @@ public:
}
}
- bool HasLocks() const { return Ranges.Size() > 0; }
+ bool HasShardLocks() const { return !ShardLocks.empty(); }
+ bool HasRangeLocks() const { return Ranges.Size() > 0; }
ui64 RangeCount() const { return Ranges.Size(); }
void Clear() {
Ranges.Clear();
+ ShardLocks.clear();
+ WriteLocks.clear();
}
private:
const TPathId TableId;
TVector<NScheme::TTypeId> KeyColumnTypes;
TRangeTreap<TLockInfo*> Ranges;
+ THashSet<TLockInfo*> ShardLocks;
+ THashSet<TLockInfo*> WriteLocks;
};
/// Owns and manages locks
class TLockLocker {
+ friend class TSysLocks;
+
public:
/// Prevent unlimited lock's count growth
class TLockLimiter {
@@ -279,6 +440,9 @@ public:
void RemoveLock(ui64 lockId);
void TouchLock(ui64 lockId);
+ TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
+ void Clear();
+
// TODO: AddPoint, AddRange
private:
@@ -299,25 +463,45 @@ public:
Tables.clear();
}
- TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at);
- TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at);
- TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at);
+ void AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key);
+ void AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key);
+ void AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables);
+ void AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables);
+
TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const;
ui64 LocksCount() const { return Locks.size(); }
- ui64 BrokenLocksCount() const { return BrokenLocks.size() + BrokenCandidates.size(); }
+ ui64 BrokenLocksCount() const { return BrokenLocksCount_; }
+
+ void BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at);
+ void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at);
+ void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at);
+ void ForceBreakLock(ui64 lockId);
+ void RemoveLock(ui64 lockTxId, ILocksDb* db);
+
+ TLockInfo* FindLockPtr(ui64 lockId) const {
+ auto it = Locks.find(lockId);
+ if (it != Locks.end()) {
+ return it->second.Get();
+ } else {
+ return nullptr;
+ }
+ }
- void BreakShardLocks(const TRowVersion& at);
- void BreakLocks(const TPointKey& point, const TRowVersion& at);
- void BreakAllLocks(const TPathId& pathId, const TRowVersion& at);
- void BreakLock(ui64 lockTxId, const TRowVersion& at);
- void RemoveLock(ui64 lockTxId);
+ TTableLocks* FindTablePtr(const TTableId& tableId) const {
+ auto it = Tables.find(tableId.PathId);
+ if (it != Tables.end()) {
+ return it->second.Get();
+ } else {
+ return nullptr;
+ }
+ }
- bool TableHasLocks(const TTableId& tableId) const {
+ bool TableHasRangeLocks(const TTableId& tableId) const {
auto it = Tables.find(tableId.PathId);
if (it == Tables.end())
return false;
- return it->second->HasLocks();
+ return it->second->HasRangeLocks();
}
TPointKey MakePoint(const TTableId& tableId, TConstArrayRef<TCell> point) const {
@@ -340,10 +524,11 @@ public:
void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo);
void RemoveSchema(const TPathId& tableId);
- bool ForceShardLock(const THashSet<TPathId>& rangeTables) const;
+ bool ForceShardLock(const TPathId& tableId) const;
+ bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;
- // optimisation: set to remove broken lock at next Remove()
- void ScheduleLockCleanup(ui64 lockId, const TRowVersion& at);
+ void ScheduleBrokenLock(TLockInfo* lock);
+ void ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at);
TPendingSubscribeLock NextPendingSubscribeLock() {
TPendingSubscribeLock result;
@@ -354,20 +539,43 @@ public:
return result;
}
- void RemoveSubscribedLock(ui64 lockId);
+ void RemoveSubscribedLock(ui64 lockId, ILocksDb* db);
+ ui32 Generation() const { return Self->Generation(); }
ui64 IncCounter() { return Counter++; };
+ TVector<TLockInfo::TPtr>& GetRemovedPersistentLocks() {
+ return RemovedPersistentLocks;
+ }
+
+ void Clear() {
+ for (auto& pr : Tables) {
+ pr.second->Clear();
+ }
+ Locks.clear();
+ ShardLocks.clear();
+ BrokenLocks.Clear();
+ CleanupPending.clear();
+ CleanupCandidates.clear();
+ PendingSubscribeLocks.clear();
+ RemovedPersistentLocks.clear();
+ Limiter.Clear();
+ }
+
private:
THolder<TLocksDataShard> Self;
THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId
THashMap<TPathId, TTableLocks::TPtr> Tables;
THashSet<ui64> ShardLocks;
- TVector<ui64> BrokenLocks; // LockIds of broken locks (optimisation)
- TVector<ui64> CleanupPending; // LockIds of broken locks with pending cleanup
- TPriorityQueue<TVersionedLockId> BrokenCandidates;
+ // A list of broken, but not yet removed locks
+ TIntrusiveList<TLockInfo, TLockInfoBrokenPersistentListTag> BrokenPersistentLocks;
+ TIntrusiveList<TLockInfo, TLockInfoBrokenListTag> BrokenLocks;
+ size_t BrokenLocksCount_ = 0;
+ // A queue of locks that need their ranges to be cleaned up
+ TVector<ui64> CleanupPending;
TPriorityQueue<TVersionedLockId> CleanupCandidates;
TList<TPendingSubscribeLock> PendingSubscribeLocks;
+ TVector<TLockInfo::TPtr> RemovedPersistentLocks;
TLockLimiter Limiter;
ui64 Counter;
@@ -380,71 +588,91 @@ private:
void RemoveBrokenRanges();
TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId);
- void RemoveOneLock(ui64 lockId);
+ TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs);
+ void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr);
void RemoveBrokenLocks();
+
+ void SaveBrokenPersistentLocks(ILocksDb* db);
};
/// A portion of locks update
struct TLocksUpdate {
ui64 LockTxId = 0;
ui32 LockNodeId = 0;
- TVector<TPointKey> PointLocks;
- TVector<TRangeKey> RangeLocks;
- TVector<TPointKey> PointBreaks;
- THashSet<TPathId> AllBreaks;
- TVector<ui64> Erases;
- bool ShardLock = false;
- bool ShardBreak = false;
- THashSet<TPathId> AffectedTables;
- THashSet<TPathId> RangeTables;
+ TLockInfo::TPtr Lock;
+
+ TStackVec<TPointKey, 4> PointLocks;
+ TStackVec<TRangeKey, 4> RangeLocks;
+
+ TIntrusiveList<TTableLocks, TTableLocksReadListTag> ReadTables;
+ TIntrusiveList<TTableLocks, TTableLocksWriteListTag> WriteTables;
+ TIntrusiveList<TTableLocks, TTableLocksAffectedListTag> AffectedTables;
+
+ TIntrusiveList<TLockInfo, TLockInfoBreakListTag> BreakLocks;
+ TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag> BreakShardLocks;
+ TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag> BreakAllLocks;
+
+ TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks;
+ TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks;
+ TIntrusiveList<TTableLocks, TTableLocksWriteConflictShardListTag> WriteConflictShardLocks;
+
+ TIntrusiveList<TLockInfo, TLockInfoEraseListTag> EraseLocks;
TRowVersion CheckVersion = TRowVersion::Max();
TRowVersion BreakVersion = TRowVersion::Min();
bool BreakOwn = false;
- void Clear() {
- LockTxId = 0;
- LockNodeId = 0;
- ShardLock = false;
- ShardBreak = false;
- PointLocks.clear();
- PointBreaks.clear();
- AllBreaks.clear();
- Erases.clear();
- }
-
bool HasLocks() const {
- return ShardLock || PointLocks.size() || RangeLocks.size();
+ return bool(AffectedTables);
}
- void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId, ui32 lockNodeId) {
+ void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) {
Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
- AffectedTables.insert(tableId.PathId);
- RangeTables.insert(tableId.PathId);
+ ReadTables.PushBack(range.Table.Get());
+ AffectedTables.PushBack(range.Table.Get());
RangeLocks.push_back(range);
}
- void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId, ui32 lockNodeId) {
+ void AddPointLock(const TPointKey& key, ui64 lockId, ui32 lockNodeId) {
Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
- AffectedTables.insert(tableId.PathId);
+ ReadTables.PushBack(key.Table.Get());
+ AffectedTables.PushBack(key.Table.Get());
PointLocks.push_back(key);
}
- void BreakLocks(const TPointKey& key) {
- PointBreaks.push_back(key);
+ void AddWriteLock(TTableLocks* table, ui64 lockId, ui32 lockNodeId) {
+ Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId);
+ WriteTables.PushBack(table);
+ AffectedTables.PushBack(table);
+ }
+
+ void AddBreakLock(TLockInfo* lock) {
+ BreakLocks.PushBack(lock);
}
- void BreakAllLocks(const TTableId& tableId) {
- AllBreaks.insert(tableId.PathId);
+ void AddBreakShardLocks(TTableLocks* table) {
+ BreakShardLocks.PushBack(table);
}
- void BreakShardLock() {
- ShardBreak = true;
+ void AddBreakAllLocks(TTableLocks* table) {
+ BreakAllLocks.PushBack(table);
}
- void EraseLock(ui64 lockId) {
- Erases.push_back(lockId);
+ void AddReadConflictLock(TLockInfo* lock) {
+ ReadConflictLocks.PushBack(lock);
+ }
+
+ void AddWriteConflictLock(TLockInfo* lock) {
+ WriteConflictLocks.PushBack(lock);
+ }
+
+ void AddWriteConflictShardLocks(TTableLocks* table) {
+ WriteConflictShardLocks.PushBack(table);
+ }
+
+ void AddEraseLock(TLockInfo* lock) {
+ EraseLocks.PushBack(lock);
}
void BreakSetLocks(ui64 lockId, ui32 lockNodeId) {
@@ -467,23 +695,24 @@ public:
TSysLocks(const T * self)
: Self(new TLocksDataShardAdapter<T>(self))
, Locker(self)
- , Update(nullptr)
- , AccessLog(nullptr)
- , Cache(nullptr)
{}
- void SetTxUpdater(TLocksUpdate * up) {
+ void SetTxUpdater(TLocksUpdate* up) {
Update = up;
}
- void SetAccessLog(TLocksCache *log) {
+ void SetAccessLog(TLocksCache* log) {
AccessLog = log;
}
- void SetCache(TLocksCache *cache) {
+ void SetCache(TLocksCache* cache) {
Cache = cache;
}
+ void SetDb(ILocksDb* db) {
+ Db = db;
+ }
+
ui64 CurrentLockTxId() const {
Y_VERIFY(Update);
return Update->LockTxId;
@@ -503,10 +732,18 @@ public:
void EraseLock(const TArrayRef<const TCell>& syslockKey);
void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId);
+ void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
+ void BreakLock(ui64 lockId);
void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
+ void AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId);
+ void AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId);
+ void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId);
void BreakAllLocks(const TTableId& tableId);
void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId);
bool IsMyKey(const TArrayRef<const TCell>& key) const;
+ bool HasWriteLock(ui64 lockId, const TTableId& tableId) const;
+ bool HasWriteLocks(const TTableId& tableId) const;
+ bool MayAddLock(ui64 lockId) const;
ui64 LocksCount() const { return Locker.LocksCount(); }
ui64 BrokenLocksCount() const { return Locker.BrokenLocksCount(); }
@@ -526,25 +763,25 @@ public:
return Locker.NextPendingSubscribeLock();
}
- void RemoveSubscribedLock(ui64 lockId) {
- Locker.RemoveSubscribedLock(lockId);
+ void RemoveSubscribedLock(ui64 lockId, ILocksDb* db) {
+ Locker.RemoveSubscribedLock(lockId, db);
}
+ void UpdateCounters();
void UpdateCounters(ui64 counter);
- // to avoid filling intermidiate Update
- // Note: don't forget to call UpdateCounters() when finish with locker
- TLockLocker& GetLocker() { return Locker; }
+ bool Load(ILocksDb& db);
private:
THolder<TLocksDataShard> Self;
TLockLocker Locker;
- TLocksUpdate * Update;
- TLocksCache *AccessLog;
- TLocksCache *Cache;
+ TLocksUpdate* Update = nullptr;
+ TLocksCache* AccessLog = nullptr;
+ TLocksCache* Cache = nullptr;
+ ILocksDb* Db = nullptr;
- TLock MakeLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const;
- TLock MakeAndLogLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const;
+ TLock MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const;
+ TLock MakeAndLogLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const;
static ui64 GetLockId(const TArrayRef<const TCell>& key) {
ui64 lockId;
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
new file mode 100644
index 00000000000..3c74047d8be
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -0,0 +1,167 @@
+#include "datashard_locks_db.h"
+
+namespace NKikimr::NDataShard {
+
+bool TDataShardLocksDb::Load(TVector<TLockRow>& rows) {
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(DB);
+
+ rows.clear();
+
+ // Load locks
+ THashMap<ui64, size_t> lockIndex;
+ {
+ auto rowset = db.Table<Schema::Locks>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ auto& lock = rows.emplace_back();
+ lock.LockId = rowset.GetValue<Schema::Locks::LockId>();
+ lock.LockNodeId = rowset.GetValue<Schema::Locks::LockNodeId>();
+ lock.Generation = rowset.GetValue<Schema::Locks::Generation>();
+ lock.Counter = rowset.GetValue<Schema::Locks::Counter>();
+ lock.CreateTs = rowset.GetValue<Schema::Locks::CreateTimestamp>();
+ lock.Flags = rowset.GetValue<Schema::Locks::Flags>();
+ lockIndex[lock.LockId] = rows.size() - 1;
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ // Load ranges
+ {
+ auto rowset = db.Table<Schema::LockRanges>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ auto lockId = rowset.GetValue<Schema::LockRanges::LockId>();
+ auto it = lockIndex.find(lockId);
+ if (it != lockIndex.end()) {
+ auto& lock = rows[it->second];
+ auto& range = lock.Ranges.emplace_back();
+ range.RangeId = rowset.GetValue<Schema::LockRanges::RangeId>();
+ range.TableId.OwnerId = rowset.GetValue<Schema::LockRanges::PathOwnerId>();
+ range.TableId.LocalPathId = rowset.GetValue<Schema::LockRanges::LocalPathId>();
+ range.Flags = rowset.GetValue<Schema::LockRanges::Flags>();
+ range.Data = rowset.GetValue<Schema::LockRanges::Data>();
+ }
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ // Load conflicts
+ {
+ auto rowset = db.Table<Schema::LockConflicts>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ auto lockId = rowset.GetValue<Schema::LockConflicts::LockId>();
+ auto it = lockIndex.find(lockId);
+ if (it != lockIndex.end()) {
+ auto& lock = rows[it->second];
+ lock.Conflicts.push_back(rowset.GetValue<Schema::LockConflicts::ConflictId>());
+ }
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+bool TDataShardLocksDb::MayAddLock(ui64 lockId) {
+ for (auto& pr : Self.GetUserTables()) {
+ auto tid = pr.second->LocalTid;
+ // We cannot start a new lockId if it has any uncompacted data
+ if (DB.HasTxData(tid, lockId)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void TDataShardLocksDb::PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::Locks>().Key(lockId).Update(
+ NIceDb::TUpdate<Schema::Locks::LockNodeId>(lockNodeId),
+ NIceDb::TUpdate<Schema::Locks::Generation>(generation),
+ NIceDb::TUpdate<Schema::Locks::Counter>(counter),
+ NIceDb::TUpdate<Schema::Locks::CreateTimestamp>(createTs),
+ NIceDb::TUpdate<Schema::Locks::Flags>(flags));
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::Locks>().Key(lockId).Update(
+ NIceDb::TUpdate<Schema::Locks::Counter>(counter));
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
+ for (auto& pr : Self.GetUserTables()) {
+ auto tid = pr.second->LocalTid;
+ // Removing the lock also removes any uncommitted data
+ if (DB.HasOpenTx(tid, lockId)) {
+ DB.RemoveTx(tid, lockId);
+ }
+ }
+
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::Locks>().Key(lockId).Delete();
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockRanges>().Key(lockId, rangeId).Update(
+ NIceDb::TUpdate<Schema::LockRanges::PathOwnerId>(tableId.OwnerId),
+ NIceDb::TUpdate<Schema::LockRanges::LocalPathId>(tableId.LocalPathId),
+ NIceDb::TUpdate<Schema::LockRanges::Flags>(flags),
+ NIceDb::TUpdate<Schema::LockRanges::Data>(data));
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockRanges>().Key(lockId, rangeId).Update(
+ NIceDb::TUpdate<Schema::LockRanges::Flags>(flags));
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistRemoveRange(ui64 lockId, ui64 rangeId) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockRanges>().Key(lockId, rangeId).Delete();
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistAddConflict(ui64 lockId, ui64 otherLockId) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockConflicts>().Key(lockId, otherLockId).Update();
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistRemoveConflict(ui64 lockId, ui64 otherLockId) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockConflicts>().Key(lockId, otherLockId).Delete();
+ HasChanges_ = true;
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_locks_db.h b/ydb/core/tx/datashard/datashard_locks_db.h
new file mode 100644
index 00000000000..ea2b2cd3b4f
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_locks_db.h
@@ -0,0 +1,43 @@
+#pragma once
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+class TDataShardLocksDb
+ : public ILocksDb
+{
+public:
+ TDataShardLocksDb(TDataShard& self, TTransactionContext& txc)
+ : Self(self)
+ , DB(txc.DB)
+ { }
+
+ bool HasChanges() const {
+ return HasChanges_;
+ }
+
+ bool Load(TVector<TLockRow>& rows) override;
+
+ bool MayAddLock(ui64 lockId) override;
+
+ // Persist adding/removing a lock info
+ void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override;
+ void PersistLockCounter(ui64 lockId, ui64 counter) override;
+ void PersistRemoveLock(ui64 lockId) override;
+
+ // Persist adding/removing info on locked ranges
+ void PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags = 0, const TString& data = {}) override;
+ void PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) override;
+ void PersistRemoveRange(ui64 lockId, ui64 rangeId) override;
+
+ // Persist a conflict, i.e. this lock must break some other lock on commit
+ void PersistAddConflict(ui64 lockId, ui64 otherLockId) override;
+ void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) override;
+
+private:
+ TDataShard& Self;
+ NTable::TDatabase& DB;
+ bool HasChanges_ = false;
+};
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index e5c976dd352..d398647ff9e 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -147,6 +147,64 @@ namespace NKqpHelpers {
return request;
}
+ inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query) {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "<empty>";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ }
+
+ inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ txId = response.GetResponse().GetTxMeta().id();
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "<empty>";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ }
+
+ inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "<empty>";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ }
+
+ inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "<empty>";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ }
+
} // namespace NKqpHelpers
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp
index 6bc76ddf396..e7724d352f0 100644
--- a/ydb/core/tx/datashard/datashard_ut_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp
@@ -275,9 +275,9 @@ namespace NTest {
static void RemoveLock(TLockTester& tester, ui64 lockTxId) {
TLocksUpdate txLocks;
- tester.StartTx(lockTxId, txLocks);
+ tester.StartTx(txLocks);
- txLocks.EraseLock(lockTxId);
+ tester.EraseLock(lockTxId);
tester.ApplyTxLocks();
}
@@ -362,19 +362,9 @@ Y_UNIT_TEST(MvccTestOutdatedLocksRemove) {
tester.PromoteCompleteVersion(TRowVersion(1, 10));
{
- // lock is ready to be deleted but still in place
- TLocksUpdate update;
- update.CheckVersion = TRowVersion(1, 5);
- tester.StartTx(update);
- UNIT_ASSERT(tester.CheckLock(10));
- tester.ApplyTxLocks();
- }
-
- {
- // erase triggers outdated locks cleanup
+ // next lock operation causes cleanup
TLocksUpdate update;
tester.StartTx(update);
- tester.EraseLock(12);
tester.ApplyTxLocks();
}
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 1da604332a4..92c2dfa267c 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1471,95 +1471,27 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
}
- Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) {
- TPortManager pm;
- TServerSettings::TControls controls;
- controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
- controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
- controls.MutableDataShardControls()->SetEnableLockedWrites(1);
-
- TServerSettings serverSettings(pm.GetPort(2134));
- serverSettings.SetDomainName("Root")
- .SetUseRealThreads(false)
- .SetEnableMvcc(true)
- .SetEnableMvccSnapshotReads(true)
- .SetEnableKqpSessionActor(UseNewEngine)
- .SetControls(controls);
-
- Tests::TServer::TPtr server = new TServer(serverSettings);
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
-
- InitRoot(server, sender);
-
- TDisableDataShardLogBatching disableDataShardLogBatching;
- CreateShardedTable(server, sender, "/Root", "table-1", 1);
-
- ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
-
- SimulateSleep(server, TDuration::Seconds(1));
-
- auto execSimpleRequest = [&](const TString& query, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), expectedStatus);
- if (response.GetResponse().GetResults().size() == 0) {
- return "";
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
- };
-
- auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- sessionId = CreateSession(runtime, reqSender);
- auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
- auto& response = ev->Get()->Record.GetRef();
- UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- txId = response.GetResponse().GetTxMeta().id();
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
- };
+ struct TLockSnapshot {
+ ui64 LockId = 0;
+ ui32 LockNodeId = 0;
+ TRowVersion MvccSnapshot = TRowVersion::Min();
+ };
- auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
- auto& response = ev->Get()->Record.GetRef();
- if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
- }
- if (response.GetResponse().GetResults().size() == 0) {
- return "";
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
- };
+ class TInjectLockSnapshotObserver {
+ public:
+ TInjectLockSnapshotObserver(TTestActorRuntime& runtime)
+ : Runtime(runtime)
+ {
+ PrevObserver = runtime.SetObserverFunc([this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ return this->Process(ev);
+ });
+ }
- auto commitSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
- auto reqSender = runtime.AllocateEdgeActor();
- auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query));
- auto& response = ev->Get()->Record.GetRef();
- if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
- return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
- }
- if (response.GetResponse().GetResults().size() == 0) {
- return "";
- }
- UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
- return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
- };
+ ~TInjectLockSnapshotObserver() {
+ Runtime.SetObserverFunc(PrevObserver);
+ }
- ui64 lastLockTxId = 0;
- ui32 lastLockNodeId = 0;
- TRowVersion lastMvccSnapshot = TRowVersion::Min();
- ui64 injectLockTxId = 0;
- ui32 injectLockNodeId = 0;
- TRowVersion injectMvccSnapshot = TRowVersion::Min();
- auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ TTestActorRuntime::EEventAction Process(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvDataShard::TEvProposeTransaction::EventType: {
auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record;
@@ -1590,37 +1522,79 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}
}
+ Last = {};
if (tx.GetLockTxId()) {
- lastLockTxId = tx.GetLockTxId();
- lastLockNodeId = tx.GetLockNodeId();
- } else if (injectLockTxId) {
- tx.SetLockTxId(injectLockTxId);
- if (injectLockNodeId) {
- tx.SetLockNodeId(injectLockNodeId);
+ Last.LockId = tx.GetLockTxId();
+ Last.LockNodeId = tx.GetLockNodeId();
+ } else if (Inject.LockId) {
+ tx.SetLockTxId(Inject.LockId);
+ if (Inject.LockNodeId) {
+ tx.SetLockNodeId(Inject.LockNodeId);
}
TString txBody;
Y_VERIFY(tx.SerializeToString(&txBody));
record.SetTxBody(txBody);
}
if (record.HasMvccSnapshot()) {
- lastMvccSnapshot.Step = record.GetMvccSnapshot().GetStep();
- lastMvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId();
- } else if (injectMvccSnapshot) {
- record.MutableMvccSnapshot()->SetStep(injectMvccSnapshot.Step);
- record.MutableMvccSnapshot()->SetTxId(injectMvccSnapshot.TxId);
+ Last.MvccSnapshot.Step = record.GetMvccSnapshot().GetStep();
+ Last.MvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId();
+ } else if (Inject.MvccSnapshot) {
+ record.MutableMvccSnapshot()->SetStep(Inject.MvccSnapshot.Step);
+ record.MutableMvccSnapshot()->SetTxId(Inject.MvccSnapshot.TxId);
}
}
break;
}
}
- return TTestActorRuntime::EEventAction::PROCESS;
- };
- auto prevObserverFunc = runtime.SetObserverFunc(capturePropose);
+ return PrevObserver(Runtime, ev);
+ }
+
+ private:
+ TTestActorRuntime& Runtime;
+ TTestActorRuntime::TEventObserver PrevObserver;
+
+ public:
+ TLockSnapshot Last;
+ TLockSnapshot Inject;
+ };
+
+ Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
// Start a snapshot read transaction
TString sessionId, txId;
UNIT_ASSERT_VALUES_EQUAL(
- beginSnapshotRequest(sessionId, txId, Q_(R"(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
SELECT key, value FROM `/Root/table-1`
WHERE key >= 1 AND key <= 3
ORDER BY key
@@ -1630,25 +1604,18 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
// We should have been acquiring locks
- Y_VERIFY(lastLockTxId != 0);
- ui64 snapshotLockTxId = lastLockTxId;
- ui32 snapshotLockNodeId = lastLockNodeId;
- Y_VERIFY(lastMvccSnapshot);
- auto snapshotVersion = lastMvccSnapshot;
+ TLockSnapshot snapshot = observer.Last;
+ Y_VERIFY(snapshot.LockId != 0);
+ Y_VERIFY(snapshot.MvccSnapshot);
// Perform an immediate write, pretending it happens as part of the above snapshot tx
- injectLockTxId = snapshotLockTxId;
- injectLockNodeId = snapshotLockNodeId;
- injectMvccSnapshot = snapshotVersion;
+ observer.Inject = snapshot;
UNIT_ASSERT_VALUES_EQUAL(
- execSimpleRequest(Q_(R"(
+ KqpSimpleExec(runtime, Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)
- )"),
- UseNewEngine ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::UNAVAILABLE),
- "");
- injectLockTxId = 0;
- injectLockNodeId = 0;
- injectMvccSnapshot = TRowVersion::Min();
+ )")),
+ UseNewEngine ? "<empty>" : "ERROR: UNAVAILABLE");
+ observer.Inject = {};
// Old engine doesn't support LockNodeId
// There's nothing to test unless we can write uncommitted data
@@ -1659,7 +1626,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
// Start another snapshot read, it should not see above write (it's uncommitted)
TString sessionId2, txId2;
UNIT_ASSERT_VALUES_EQUAL(
- beginSnapshotRequest(sessionId2, txId2, Q_(R"(
+ KqpSimpleBegin(runtime, sessionId2, txId2, Q_(R"(
SELECT key, value FROM `/Root/table-1`
WHERE key >= 1 AND key <= 3
ORDER BY key
@@ -1670,7 +1637,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
// Perform another read using the first snapshot tx, it must see its own writes
UNIT_ASSERT_VALUES_EQUAL(
- continueSnapshotRequest(sessionId, txId, Q_(R"(
+ KqpSimpleContinue(runtime, sessionId, txId, Q_(R"(
SELECT key, value FROM `/Root/table-1`
WHERE key >= 1 AND key <= 3
ORDER BY key
@@ -1682,17 +1649,17 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
// Now commit with additional changes (temporarily needed to trigger lock commits)
UNIT_ASSERT_VALUES_EQUAL(
- commitSnapshotRequest(sessionId, txId, Q_(R"(
+ KqpSimpleCommit(runtime, sessionId, txId, Q_(R"(
UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
)")),
- "");
+ "<empty>");
if (UseNewEngine) {
// Verify new snapshots observe all committed changes
// This is only possible with new engine at this time
TString sessionId3, txId3;
UNIT_ASSERT_VALUES_EQUAL(
- beginSnapshotRequest(sessionId3, txId3, Q_(R"(
+ KqpSimpleBegin(runtime, sessionId3, txId3, Q_(R"(
SELECT key, value FROM `/Root/table-1`
WHERE key >= 1 AND key <= 3
ORDER BY key
@@ -1705,6 +1672,120 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
}
+ Y_UNIT_TEST(MvccSnapshotLockedWritesRestart) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We should have been acquiring locks
+ TLockSnapshot snapshot = observer.Last;
+ Y_VERIFY(snapshot.LockId != 0);
+ Y_VERIFY(snapshot.MvccSnapshot);
+
+ // Perform an immediate write, pretending it happens as part of the above snapshot tx
+ // We expect read lock to be upgraded to write lock and become persistent
+ observer.Inject = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)
+ )")),
+ "<empty>");
+ observer.Inject = {};
+
+ // Reboot tablet, persistent locks must not be lost
+ RebootTablet(runtime, shards1[0], sender);
+
+ // Start another snapshot read, it should not see above write (it's uncommitted)
+ TString sessionId2, txId2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId2, txId2, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Perform another read using the first snapshot tx, it must see its own writes
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleContinue(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+
+ // Now commit with additional changes (temporarily needed to trigger lock commits)
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
+ )")),
+ "<empty>");
+
+ // Verify new snapshots observe all committed changes
+ // This is only possible with new engine at this time
+ TString sessionId3, txId3;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId3, txId3, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index d237bbdbcb6..a9d840cace0 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -2,6 +2,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
namespace NKikimr {
namespace NDataShard {
@@ -29,7 +30,8 @@ public:
op->MvccReadWriteVersion.reset();
}
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_VERIFY(tx != nullptr);
diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
index 0ae61b8af80..e1949849105 100644
--- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
@@ -3,6 +3,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
namespace NKikimr {
namespace NDataShard {
@@ -29,7 +30,8 @@ public:
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
const auto& commitTx = tx->GetCommitWritesTx()->GetBody();
const auto versions = DataShard.GetReadWriteVersions(op.Get());
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index b6ac426d252..4f9d2066994 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -1,6 +1,7 @@
#include "datashard_kqp.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
namespace NKikimr {
@@ -65,7 +66,8 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
op->MvccReadWriteVersion.reset();
}
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
@@ -161,9 +163,14 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds());
op->ResetCurrentTimer();
- if (op->IsReadOnly())
+ if (op->IsReadOnly() && !locksDb.HasChanges())
return EExecutionStatus::Executed;
+ if (locksDb.HasChanges()) {
+ // We made some changes to locks db, make sure we wait for commit
+ op->SetWaitCompletionFlag(true);
+ }
+
return EExecutionStatus::ExecutedNoMoreRestarts;
}
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index bc6fda112e0..d15c7e551cf 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -5,6 +5,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
#include <util/generic/bitmap.h>
@@ -35,7 +36,8 @@ public:
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
const auto& eraseTx = tx->GetDistributedEraseTx();
const auto& request = eraseTx->GetRequest();
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index b60562af1a2..5ff35cd413e 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -3,6 +3,7 @@
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
+#include "datashard_locks_db.h"
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/rm/kqp_rm.h>
@@ -67,7 +68,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
op->MvccReadWriteVersion.reset();
}
- TSetupSysLocks guardLocks(op, DataShard);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ TSetupSysLocks guardLocks(op, DataShard, &locksDb);
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
@@ -134,10 +136,13 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) {
- KqpRollbackLockChanges(tabletId, tx, DataShard, txc);
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
DataShard.SysLocksTable().ApplyLocks();
DataShard.SubscribeNewLocks(ctx);
+ if (locksDb.HasChanges()) {
+ op->SetWaitCompletionFlag(true);
+ return EExecutionStatus::ExecutedNoMoreRestarts;
+ }
return EExecutionStatus::Executed;
}
@@ -226,7 +231,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds());
op->ResetCurrentTimer();
- return op->IsReadOnly() ? EExecutionStatus::Executed : EExecutionStatus::ExecutedNoMoreRestarts;
+ if (op->IsReadOnly() && !locksDb.HasChanges()) {
+ return EExecutionStatus::Executed;
+ }
+
+ if (locksDb.HasChanges()) {
+ op->SetWaitCompletionFlag(true);
+ }
+
+ return EExecutionStatus::ExecutedNoMoreRestarts;
}
void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) {
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 6d9a0b5b459..59a90ced678 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -444,8 +444,7 @@ struct TOutputOpData {
TDelayedAcks DelayedAcks;
TOutReadSets OutReadSets;
TVector<THolder<TEvTxProcessing::TEvReadSet>> PreparedOutReadSets;
- // Updates and checked locks.
- TLocksUpdate LocksUpdate;
+ // Access log of checked locks
TLocksCache LocksAccessLog;
// Collected change records
TVector<TChangeRecord> ChangeRecords;
@@ -604,7 +603,6 @@ public:
return OutputDataRef().PreparedOutReadSets;
}
- TLocksUpdate &LocksUpdate() { return OutputDataRef().LocksUpdate; }
TLocksCache &LocksAccessLog() { return OutputDataRef().LocksAccessLog; }
TVector<TOutputOpData::TChangeRecord> &ChangeRecords() { return OutputDataRef().ChangeRecords; }
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index dcd4eb6eb94..47b71bc6f96 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -159,7 +159,6 @@ public:
bool IsHeadRead = false;
ui64 LockTxId = 0;
TLockInfo::TPtr Lock;
- bool ReportedLockBroken = false;
// note that will be always overwritten by values from request
NKikimrTxDataShard::EScanDataFormat Format = NKikimrTxDataShard::EScanDataFormat::CELLVEC;
diff --git a/ydb/core/tx/datashard/remove_locks.cpp b/ydb/core/tx/datashard/remove_locks.cpp
index f05ab6d7b81..d0204f18939 100644
--- a/ydb/core/tx/datashard/remove_locks.cpp
+++ b/ydb/core/tx/datashard/remove_locks.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
namespace NKikimr::NDataShard {
@@ -16,20 +17,11 @@ public:
TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK; }
bool Execute(TTransactionContext& txc, const TActorContext&) override {
- // Remove any uncommitted changes with this lock id
- // FIXME: if a distributed tx has already validated (and persisted)
- // its locks, we must preserve uncommitted changes even when lock is
- // removed on the originating node, since the final outcome may
- // actually decide to commit.
- for (const auto& pr : Self->GetUserTables()) {
- auto localTid = pr.second->LocalTid;
- if (txc.DB.HasOpenTx(localTid, LockId)) {
- txc.DB.RemoveTx(localTid, LockId);
- }
- }
-
// Remove the lock from memory, it's no longer needed
- Self->SysLocks.RemoveSubscribedLock(LockId);
+ // Note: locksDb will also remove uncommitted changes
+ // when removing a persistent lock.
+ TDataShardLocksDb locksDb(*Self, txc);
+ Self->SysLocks.RemoveSubscribedLock(LockId, &locksDb);
return true;
}
@@ -57,6 +49,10 @@ void TDataShard::Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorC
}
void TDataShard::SubscribeNewLocks(const TActorContext&) {
+ SubscribeNewLocks();
+}
+
+void TDataShard::SubscribeNewLocks() {
while (auto pendingSubscribeLock = SysLocks.NextPendingSubscribeLock()) {
Send(MakeLongTxServiceID(SelfId().NodeId()),
new TEvLongTxService::TEvSubscribeLock(
diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h
index 026ba14f41d..9cfd6bff8af 100644
--- a/ydb/core/tx/datashard/setup_sys_locks.h
+++ b/ydb/core/tx/datashard/setup_sys_locks.h
@@ -6,18 +6,41 @@
namespace NKikimr {
namespace NDataShard {
-struct TSetupSysLocks {
+struct TSetupSysLocks
+ : public TLocksUpdate
+{
TSysLocks &SysLocksTable;
- TSetupSysLocks(TOperation::TPtr op,
- TDataShard &self)
+ TSetupSysLocks(TDataShard& self, ILocksDb* db)
+ : SysLocksTable(self.SysLocksTable())
+ {
+ CheckVersion = TRowVersion::Min();
+ BreakVersion = TRowVersion::Min();
+
+ SysLocksTable.SetTxUpdater(this);
+ SysLocksTable.SetDb(db);
+ }
+
+ TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, const TRowVersion& readVersion,
+ TDataShard& self, ILocksDb* db)
: SysLocksTable(self.SysLocksTable())
{
- TLocksUpdate &update = op->LocksUpdate();
+ LockTxId = lockTxId;
+ LockNodeId = lockNodeId;
+ CheckVersion = readVersion;
+ BreakVersion = TRowVersion::Min();
- update.Clear();
- update.LockTxId = op->LockTxId();
- update.LockNodeId = op->LockNodeId();
+ SysLocksTable.SetTxUpdater(this);
+ SysLocksTable.SetDb(db);
+ }
+
+ TSetupSysLocks(TOperation::TPtr op,
+ TDataShard &self,
+ ILocksDb* db)
+ : SysLocksTable(self.SysLocksTable())
+ {
+ LockTxId = op->LockTxId();
+ LockNodeId = op->LockNodeId();
if (self.IsMvccEnabled()) {
auto [readVersion, writeVersion] = self.GetReadWriteVersions(op.Get());
@@ -29,21 +52,23 @@ struct TSetupSysLocks {
outOfOrder = true;
}
- update.CheckVersion = readVersion;
- update.BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min();
+ CheckVersion = readVersion;
+ BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min();
}
- SysLocksTable.SetTxUpdater(&update);
+ SysLocksTable.SetTxUpdater(this);
if (!op->LocksCache().Locks.empty())
SysLocksTable.SetCache(&op->LocksCache());
else
SysLocksTable.SetAccessLog(&op->LocksAccessLog());
+ SysLocksTable.SetDb(db);
}
~TSetupSysLocks() {
SysLocksTable.SetTxUpdater(nullptr);
SysLocksTable.SetCache(nullptr);
SysLocksTable.SetAccessLog(nullptr);
+ SysLocksTable.SetDb(nullptr);
}
};
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 05d887b602f..9db6a1eaef8 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1550,5 +1550,178 @@
"Blobs": 1
}
}
+ },
+ {
+ "TableId": 29,
+ "TableName": "Locks",
+ "TableKey": [
+ 1
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "LockNodeId",
+ "ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "Generation",
+ "ColumnType": "Uint32"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "Counter",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "CreateTimestamp",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "Flags",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
+ "TableId": 30,
+ "TableName": "LockRanges",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "RangeId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "PathOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "LocalPathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "Flags",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "Data",
+ "ColumnType": "String"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
+ "TableId": 31,
+ "TableName": "LockConflicts",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "ConflictId",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
}
] \ No newline at end of file