diff options
author | snaury <snaury@ydb.tech> | 2022-09-06 13:58:57 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-09-06 13:58:57 +0300 |
commit | 0a0b987a5acb6f6a7f9a7a11175903af9e61ad63 (patch) | |
tree | f17c244af7d5ab54e369d9e25cdea6cbc0de23b1 | |
parent | 4d2299379e613653fa653e340e899dee02174f57 (diff) | |
download | ydb-0a0b987a5acb6f6a7f9a7a11175903af9e61ad63.tar.gz |
Persistent write locks in datashard,
27 files changed, 1989 insertions, 589 deletions
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 4ba7b98e24d..62157ed5b3e 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -148,6 +148,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_change_sending.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_loans.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_locks_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_locks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_split_dst.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_split_src.cpp diff --git a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp index cd8d3dab69a..0d088881c28 100644 --- a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp @@ -2,6 +2,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" namespace NKikimr { namespace NDataShard { @@ -43,7 +44,8 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) { - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 60311f907ec..ba28e01ba29 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -3,6 +3,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" #include <ydb/core/kqp/rm/kqp_rm.h> @@ -43,7 +44,8 @@ bool TBuildKqpDataTxOutRSUnit::IsReadyToExecute(TOperation::TPtr) const { EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) { - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index d957d70e84d..f1f18dcf3ca 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -408,8 +408,11 @@ public: return; } - // TODO: handle presistent tx locks - if (!LockTxId) { + CheckWriteConflicts(tableId, row); + + if (LockTxId) { + Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId); + } else { Self->SysLocksTable().BreakLock(tableId, row); } Self->SetTableUpdateTime(tableId, Now); @@ -463,8 +466,11 @@ public: return; } - // TODO: handle persistent tx locks - if (!LockTxId) { + CheckWriteConflicts(tableId, row); + + if (LockTxId) { + Self->SysLocksTable().SetWriteLock(tableId, row, LockTxId, LockNodeId); + } else { Self->SysLocksTable().BreakLock(tableId, row); } @@ -524,23 +530,166 @@ public: if (TSysTables::IsSystemTable(tableId) || !LockTxId) return nullptr; - // Don't use tx map when we know there's no open tx with the given txId - if (!DB.HasOpenTx(LocalTableId(tableId), LockTxId)) { + // Don't use tx map when we know there's no write lock for a table + // Note: currently write lock implies uncommitted changes + if (!Self->SysLocksTable().HasWriteLock(LockTxId, tableId)) { return nullptr; } - // Uncommitted changes are visible in all possible snapshots - // TODO: we need to guarantee no other changes committed between snapshot read and our local changes - return new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + auto& ptr = TxMaps[tableId]; + if (!ptr) { + // Uncommitted changes are visible in all possible snapshots + ptr = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + } + + return ptr; } NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override { - if (TSysTables::IsSystemTable(tableId)) + if (TSysTables::IsSystemTable(tableId) || !LockTxId) + return nullptr; + + if (!Self->SysLocksTable().HasWriteLocks(tableId)) { + // We don't have any active write locks, so there's nothing we + // could possibly conflict with. return nullptr; + } + + auto& ptr = TxObservers[tableId]; + if (!ptr) { + // This observer is supposed to find conflicts + ptr = new TReadTxObserver(this, tableId); + } + + return ptr; + } + + class TReadTxObserver : public NTable::ITransactionObserver { + public: + TReadTxObserver(const TDataShardEngineHost* host, const TTableId& tableId) + : Host(host) + , TableId(tableId) + { + Y_UNUSED(Host); + Y_UNUSED(TableId); + } + + void OnSkipUncommitted(ui64 txId) override { + Host->AddReadConflict(TableId, txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // We already use InvisibleRowSkips for these + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // We already use InvisibleRowSkips for these + } + + void OnApplyCommitted(const TRowVersion& rowVersion) override { + Host->CheckReadConflict(TableId, rowVersion); + } + + void OnApplyCommitted(const TRowVersion& rowVersion, ui64) override { + Host->CheckReadConflict(TableId, rowVersion); + } - // TODO: use observer to detect conflicts with other uncommitted transactions + private: + const TDataShardEngineHost* const Host; + const TTableId TableId; + }; - return nullptr; + void AddReadConflict(const TTableId& tableId, ui64 txId) const { + Y_UNUSED(tableId); + if (LockTxId) { + // We have detected uncommitted changes in txId that could affect + // our read result. We arrange a conflict that breaks our lock + // when txId commits. + Self->SysLocksTable().AddReadConflict(txId, LockTxId, LockNodeId); + } + } + + void CheckReadConflict(const TTableId& tableId, const TRowVersion& rowVersion) const { + Y_UNUSED(tableId); + if (rowVersion > ReadVersion) { + // We are reading from snapshot at ReadVersion and should not normally + // observe changes with a version above that. However, if we have an + // uncommitted change, that we fake as committed for our own changes + // visibility, we might shadow some change that happened after a + // snapshot. This is a clear indication of a conflict between read + // and that future conflict, hence we must break locks and abort. + // TODO: add an actual abort + Self->SysLocksTable().BreakSetLocks(LockTxId, LockNodeId); + } + } + + void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) { + if (!Self->SysLocksTable().HasWriteLocks(tableId)) { + // We don't have any active write locks, so there's nothing we + // could possibly conflict with. + return; + } + + const auto localTid = LocalTableId(tableId); + Y_VERIFY(localTid); + const TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(localTid); + TSmallVec<TRawTypeValue> key; + ConvertTableKeys(Scheme, tableInfo, row, key, nullptr); + + // We are not actually interested in the row version, we only need to + // detect uncommitted transaction skips on the path to that version. + auto res = Db.SelectRowVersion( + localTid, key, /* readFlags */ 0, + GetReadTxMap(tableId), + new TWriteTxObserver(this, tableId)); + + if (res.Ready == NTable::EReady::Page) { + throw TNotReadyTabletException(); + } + } + + class TWriteTxObserver : public NTable::ITransactionObserver { + public: + TWriteTxObserver(const TDataShardEngineHost* host, const TTableId& tableId) + : Host(host) + , TableId(tableId) + { + Y_UNUSED(Host); + Y_UNUSED(TableId); + } + + void OnSkipUncommitted(ui64 txId) override { + Host->AddWriteConflict(TableId, txId); + } + + void OnSkipCommitted(const TRowVersion&) override { + // nothing + } + + void OnSkipCommitted(const TRowVersion&, ui64) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&) override { + // nothing + } + + void OnApplyCommitted(const TRowVersion&, ui64) override { + // nothing + } + + private: + const TDataShardEngineHost* const Host; + const TTableId TableId; + }; + + void AddWriteConflict(const TTableId& tableId, ui64 txId) const { + Y_UNUSED(tableId); + if (LockTxId) { + Self->SysLocksTable().AddWriteConflict(txId, LockTxId, LockNodeId); + } else { + Self->SysLocksTable().BreakLock(txId); + } } private: @@ -558,6 +707,8 @@ private: TRowVersion WriteVersion = TRowVersion::Max(); TRowVersion ReadVersion = TRowVersion::Min(); mutable THashMap<TTableId, THolder<IChangeCollector>> ChangeCollectors; + mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps; + mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers; }; // diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index dc12c49959b..e5438459289 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -1,4 +1,5 @@ #include "datashard_txs.h" +#include "datashard_locks_db.h" #include <ydb/core/base/tx_processing.h> #include <ydb/core/tablet/tablet_exception.h> @@ -164,6 +165,9 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { PRECHARGE_SYS_TABLE(Schema::DstReplicationSourceOffsetsReceived); PRECHARGE_SYS_TABLE(Schema::UserTablesStats); PRECHARGE_SYS_TABLE(Schema::SchemaSnapshots); + PRECHARGE_SYS_TABLE(Schema::Locks); + PRECHARGE_SYS_TABLE(Schema::LockRanges); + PRECHARGE_SYS_TABLE(Schema::LockConflicts); if (!ready) return false; @@ -479,6 +483,15 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) { + TDataShardLocksDb locksDb(*Self, txc); + if (!Self->SysLocks.Load(locksDb)) { + return false; + } + } + + Self->SubscribeNewLocks(); + return true; } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index cc0c87c6ee3..3145f6f50ec 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1,5 +1,7 @@ #include "datashard_impl.h" #include "datashard_read_operation.h" +#include "setup_sys_locks.h" +#include "datashard_locks_db.h" #include <ydb/core/formats/arrow_batch_builder.h> @@ -789,16 +791,20 @@ public: } } + bool hadWrites = false; + if (Request->Record.HasLockTxId()) { // note that we set locks only when first read finish transaction, // i.e. we have read something without page faults - AcquireLock(ctx, state); + hadWrites |= AcquireLock(txc, ctx, state); } - if (!Self->IsFollower()) - Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc); + if (!Self->IsFollower()) { + auto res = Self->PromoteImmediatePostExecuteEdges(state.ReadVersion, readType, txc); + hadWrites |= res.HadWrites; + } - return EExecutionStatus::DelayComplete; + return hadWrites ? EExecutionStatus::DelayCompleteNoMoreRestarts : EExecutionStatus::DelayComplete; } void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override { @@ -1268,17 +1274,18 @@ private: ValidationInfo.Loaded = true; } - void AcquireLock(const TActorContext& ctx, TReadIteratorState& state) { + bool AcquireLock(TTransactionContext& txc, const TActorContext& ctx, TReadIteratorState& state) { auto& sysLocks = Self->SysLocksTable(); - auto& locker = sysLocks.GetLocker(); const auto lockTxId = state.Request->Record.GetLockTxId(); const auto lockNodeId = state.Request->Record.GetLockNodeId(); TTableId tableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion); - TLockInfo::TPtr lock; state.LockTxId = lockTxId; + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guard(lockTxId, lockNodeId, state.ReadVersion, *Self, &locksDb); + if (!state.Request->Keys.empty()) { for (size_t i = 0; i < state.Request->Keys.size(); ++i) { const auto& key = state.Request->Keys[i]; @@ -1289,62 +1296,47 @@ private: true, key.GetCells(), true); - TRangeKey rangeKey = locker.MakeRange(tableId, lockRange); - lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion); + sysLocks.SetLock(tableId, lockRange, lockTxId, lockNodeId); } else { - TPointKey pointKey = locker.MakePoint(tableId, key.GetCells()); - lock = locker.AddPointLock(lockTxId, lockNodeId, pointKey, state.ReadVersion); + sysLocks.SetLock(tableId, key.GetCells(), lockTxId, lockNodeId); } } } else { - // since no keys, then we must have ranges (has been checked initially) + // no keys, so we must have ranges (has been checked initially) for (size_t i = 0; i < state.Request->Ranges.size(); ++i) { auto range = state.Request->Ranges[i].ToTableRange(); - TRangeKey rangeKey = locker.MakeRange(tableId, range); - lock = locker.AddRangeLock(lockTxId, lockNodeId, rangeKey, state.ReadVersion); + sysLocks.SetLock(tableId, range, lockTxId, lockNodeId); } } - ui64 counter; - ui64 lockId; - bool isBroken; - if (lock) { - counter = lock->GetCounter(state.ReadVersion); - lockId = lock->GetLockId(); - isBroken = lock->IsBroken(state.ReadVersion); - } else { - counter = TSysTables::TLocksTable::TLock::ErrorNotSet; - lockId = lockTxId; - isBroken = true; - } - - if (!isBroken && Reader->HasInvisibleRowSkips()) { - locker.BreakLock(lockTxId, TRowVersion::Min()); - isBroken = true; - counter = TSysTables::TLocksTable::TLock::ErrorAlreadyBroken; + if (Reader->HasInvisibleRowSkips()) { + sysLocks.BreakSetLocks(lockTxId, lockNodeId); } - sysLocks.UpdateCounters(counter); + auto locks = sysLocks.ApplyLocks(); - NKikimrTxDataShard::TLock *addLock; - if (!isBroken) { - addLock = Result->Record.AddTxLocks(); - } else { - addLock = Result->Record.AddBrokenTxLocks(); - } + for (auto& lock : locks) { + NKikimrTxDataShard::TLock* addLock; + if (lock.IsError()) { + addLock = Result->Record.AddBrokenTxLocks(); + } else { + addLock = Result->Record.AddTxLocks(); + } - addLock->SetLockId(lockId); - addLock->SetDataShard(Self->TabletID()); - addLock->SetGeneration(Self->Generation()); - addLock->SetCounter(counter); - addLock->SetSchemeShard(state.PathId.OwnerId); - addLock->SetPathId(state.PathId.LocalPathId); + addLock->SetLockId(lock.LockId); + addLock->SetDataShard(lock.DataShard); + addLock->SetGeneration(lock.Generation); + addLock->SetCounter(lock.Counter); + addLock->SetSchemeShard(lock.SchemeShard); + addLock->SetPathId(lock.PathId); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " Acquired lock# " << lockId << ", counter# " << counter - << " for " << state.PathId); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() + << " Acquired lock# " << lock.LockId << ", counter# " << lock.Counter + << " for " << state.PathId); + } - state.Lock = lock; // note that might be nullptr + state.Lock = guard.Lock; // will be nullptr if broken + return locksDb.HasChanges(); } }; @@ -1527,7 +1519,7 @@ public: Result->Record, Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Unknown table id: " << state.PathId.LocalPathId); - SendResult(ctx); + SendResult(txc, ctx); return true; } auto userTableInfo = it->second; @@ -1539,7 +1531,7 @@ public: Ydb::StatusIds::SCHEME_ERROR, TStringBuilder() << "Schema changed, current " << currentSchemaVersion << ", requested table schemaversion " << state.SchemaVersion); - SendResult(ctx); + SendResult(txc, ctx); return true; } @@ -1552,7 +1544,7 @@ public: Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Failed to get scheme for table local id: " << state.PathId.LocalPathId); - SendResult(ctx); + SendResult(txc, ctx); return true; } TableInfo = TShortTableInfo(state.PathId.LocalPathId, *schema); @@ -1575,7 +1567,7 @@ public: << state.ReadVersion << " shard " << Self->TabletID() << " with lowWatermark " << Self->GetSnapshotManager().GetLowWatermark() << (Self->IsFollower() ? " RO replica" : "")); - SendResult(ctx); + SendResult(txc, ctx); return true; } @@ -1586,7 +1578,7 @@ public: Result->Record, Ydb::StatusIds::BAD_REQUEST, p.second); - SendResult(ctx); + SendResult(txc, ctx); return true; } std::swap(BlockBuilder, p.first); @@ -1600,7 +1592,7 @@ public: Reader.reset(new TReader(state, *BlockBuilder, TableInfo)); if (Reader->Read(txc, ctx)) { - SendResult(ctx); + SendResult(txc, ctx); return true; } return false; @@ -1610,7 +1602,7 @@ public: // nothing to do } - void SendResult(const TActorContext& ctx) { + void SendResult(TTransactionContext& txc, const TActorContext& ctx) { const auto* request = Ev->Get(); TReadIteratorId readId(request->Reader, request->ReadId); auto it = Self->ReadIterators.find(readId); @@ -1642,38 +1634,31 @@ public: return; } - if (state.Lock && !state.ReportedLockBroken) { - bool isBroken = false; - ui64 counter; - ui64 lockId; - if (state.Lock->IsBroken(state.ReadVersion)) { - isBroken = true; - counter = state.Lock->GetCounter(state.ReadVersion); - lockId = state.Lock->GetLockId(); - - } else if (Reader->HasInvisibleRowSkips()) { - isBroken = true; - counter = TSysTables::TLocksTable::TLock::ErrorBroken; - lockId = state.LockTxId; - + if (state.Lock) { + bool isBroken = state.Lock->IsBroken(state.ReadVersion); + if (!isBroken && Reader->HasInvisibleRowSkips()) { auto& sysLocks = Self->SysLocksTable(); - auto& locker = sysLocks.GetLocker(); - locker.BreakLock(state.LockTxId, TRowVersion::Min()); - sysLocks.UpdateCounters(counter); + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guard(*Self, &locksDb); + sysLocks.BreakLock(state.Lock->GetLockId()); + sysLocks.ApplyLocks(); + Y_VERIFY(state.Lock->IsBroken()); + isBroken = true; } if (isBroken) { - state.ReportedLockBroken = true; NKikimrTxDataShard::TLock *addLock = record.AddBrokenTxLocks(); - addLock->SetLockId(lockId); + addLock->SetLockId(state.Lock->GetLockId()); addLock->SetDataShard(Self->TabletID()); - addLock->SetGeneration(Self->Generation()); - addLock->SetCounter(counter); + addLock->SetGeneration(state.Lock->GetGeneration()); + addLock->SetCounter(state.Lock->GetCounter(state.ReadVersion)); addLock->SetSchemeShard(state.PathId.OwnerId); addLock->SetPathId(state.PathId.LocalPathId); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId - << " TTxReadContinue::Execute() found broken lock# " << lockId); + << " TTxReadContinue::Execute() found broken lock# " << state.Lock->GetLockId()); + + state.Lock = nullptr; } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index def43a6bff3..1a98d9ab77b 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -267,6 +267,7 @@ class TDataShard friend class TS3DownloadsManager; friend class TS3Downloader; friend struct TSetupSysLocks; + friend class TDataShardLocksDb; friend class TTxStartMvccStateChange; friend class TTxExecuteMvccStateChange; @@ -776,12 +777,44 @@ class TDataShard using TColumns = TableColumns<PathOwnerId, LocalPathId, SchemaVersion, Step, TxId, Schema>; }; + struct Locks : Table<29> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct LockNodeId : Column<2, NScheme::NTypeIds::Uint32> {}; + struct Generation : Column<3, NScheme::NTypeIds::Uint32> {}; + struct Counter : Column<4, NScheme::NTypeIds::Uint64> {}; + struct CreateTimestamp : Column<5, NScheme::NTypeIds::Uint64> {}; + struct Flags : Column<6, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<LockId>; + using TColumns = TableColumns<LockId, LockNodeId, Generation, Counter, CreateTimestamp, Flags>; + }; + + struct LockRanges : Table<30> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct RangeId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {}; + struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {}; + struct Flags : Column<5, NScheme::NTypeIds::Uint64> {}; + struct Data : Column<6, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<LockId, RangeId>; + using TColumns = TableColumns<LockId, RangeId, PathOwnerId, LocalPathId, Flags, Data>; + }; + + struct LockConflicts : Table<31> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct ConflictId : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<LockId, ConflictId>; + using TColumns = TableColumns<LockId, ConflictId>; + }; + using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue, DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress, Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts, SrcChangeSenderActivations, DstChangeSenderActivations, ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived, - UserTablesStats, SchemaSnapshots>; + UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts>; // These settings are persisted on each Init. So we use empty settings in order not to overwrite what // was changed by the user @@ -1558,6 +1591,7 @@ public: void ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, const TActorContext &ctx); void SubscribeNewLocks(const TActorContext &ctx); + void SubscribeNewLocks(); private: /// diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index b49930a683c..9a784c24545 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -695,39 +695,6 @@ void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataS LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid); txc.DB.CommitTx(localTid, txId); } - } else { - KqpRollbackLockChanges(origin, tx, dataShard, txc); - } -} - -void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { - auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); - - if (!kqpTx.HasLocks()) { - return; - } - - if (NeedEraseLocks(kqpTx.GetLocks().GetOp())) { - for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { - if (lockProto.GetDataShard() != origin) { - continue; - } - - TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); - auto localTid = dataShard.GetLocalTableId(tableId); - if (!localTid) { - // It may have been dropped already - continue; - } - - auto txId = lockProto.GetLockId(); - if (!txc.DB.HasOpenTx(localTid, txId)) { - continue; - } - - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpRollbackLockChanges: removing txId# " << txId << " from localTid# " << localTid); - txc.DB.RemoveTx(localTid, txId); - } } } diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 452fc92d337..1f45d28a1fe 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -38,7 +38,6 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); -void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters); diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index b83770b18e1..2db6e835171 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -393,16 +393,12 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con TouchTablePoint(tableId, key); Shard->GetKeyAccessSampler()->AddSample(tableId, key); - NTable::ITransactionMapPtr txMap; - if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { - txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); - } - // TODO: tx observer - NTable::TRowState dbRow; NTable::TSelectStats stats; ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0; - auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), txMap); + auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), + EngineHost.GetReadTxMap(tableId), + EngineHost.GetReadTxObserver(tableId)); kqpStats.NSelectRow = 1; kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips; @@ -454,14 +450,10 @@ TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTa keyRange.MinInclusive = range.InclusiveFrom; keyRange.MaxInclusive = range.InclusiveTo; - NTable::ITransactionMapPtr txMap; - if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { - txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); - } - // TODO: tx observer - TouchTableRange(tableId, range); - return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), txMap); + return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), + EngineHost.GetReadTxMap(tableId), + EngineHost.GetReadTxObserver(tableId)); } TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId, @@ -481,14 +473,10 @@ TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIter keyRange.MinInclusive = range.InclusiveFrom; keyRange.MaxInclusive = range.InclusiveTo; - NTable::ITransactionMapPtr txMap; - if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { - txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); - } - // TODO: tx observer - TouchTableRange(tableId, range); - return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), txMap); + return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), + EngineHost.GetReadTxMap(tableId), + EngineHost.GetReadTxObserver(tableId)); } template <typename TReadTableIterator> diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 7b4181b0bc1..7385b514a7f 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -14,23 +14,54 @@ TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId) : Locker(locker) , LockId(lockId) , LockNodeId(lockNodeId) + , Generation(locker->Generation()) , Counter(locker->IncCounter()) , CreationTime(TAppData::TimeProvider->Now()) {} +TLockInfo::TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) + : Locker(locker) + , LockId(lockId) + , LockNodeId(lockNodeId) + , Generation(generation) + , Counter(counter) + , CreationTime(createTs) + , Persistent(true) +{ + if (counter == Max<ui64>()) { + BreakVersion.emplace(TRowVersion::Min()); + } +} + TLockInfo::~TLockInfo() { - // nothing + if (!ConflictLocks.empty()) { + for (auto& pr : ConflictLocks) { + // Ensure there are no dangling pointers + pr.first->ConflictLocks.erase(this); + } + ConflictLocks.clear(); + } } -void TLockInfo::AddShardLock(const THashSet<TPathId>& affectedTables) { - AffectedTables.insert(affectedTables.begin(), affectedTables.end()); +void TLockInfo::MakeShardLock() { ShardLock = true; Points.clear(); Ranges.clear(); } +bool TLockInfo::AddShardLock(const TPathId& pathId) { + Y_VERIFY(ShardLock); + if (ReadTables.insert(pathId).second) { + UnpersistedRanges = true; + return true; + } + return false; +} + bool TLockInfo::AddPoint(const TPointKey& point) { - AffectedTables.insert(point.Table->GetTableId()); + if (ReadTables.insert(point.Table->GetTableId()).second) { + UnpersistedRanges = true; + } if (!ShardLock) { Points.emplace_back(point); } @@ -38,32 +69,217 @@ bool TLockInfo::AddPoint(const TPointKey& point) { } bool TLockInfo::AddRange(const TRangeKey& range) { - AffectedTables.insert(range.Table->GetTableId()); + if (ReadTables.insert(range.Table->GetTableId()).second) { + UnpersistedRanges = true; + } if (!ShardLock) { Ranges.emplace_back(range); } return !ShardLock; } -void TLockInfo::SetBroken(const TRowVersion& at) { -#if 1 // optimisation: remove at next Remove - if (!IsBroken(at)) - Locker->ScheduleLockCleanup(LockId, at); -#endif +bool TLockInfo::AddWriteLock(const TPathId& pathId) { + if (WriteTables.insert(pathId).second) { + UnpersistedRanges = true; + return true; + } + return false; +} - if (!BreakVersion || at < *BreakVersion) - BreakVersion.emplace(at.Step, at.TxId); +void TLockInfo::SetBroken(TRowVersion at) { + if (Persistent) { + // Persistent locks always break completely + at = TRowVersion::Min(); + } - if (at) - return; // if break version is not TrowVersion::Min() we will postpone actual ranges removal + if (!IsBroken(at)) { + BreakVersion = at; + Locker->ScheduleRemoveBrokenRanges(LockId, at); - Points.clear(); - Ranges.clear(); + if (!at) { + // This lock is now broken in all versions, clear as soon as possible + Counter = Max<ui64>(); + Points.clear(); + Ranges.clear(); + Locker->ScheduleBrokenLock(this); + } + } +} + +void TLockInfo::PersistLock(ILocksDb* db) { + Y_VERIFY(!IsPersistent()); + Y_VERIFY(db, "Cannot persist lock without a db"); + db->PersistAddLock(LockId, LockNodeId, Generation, Counter, CreationTime.MicroSeconds()); + Persistent = true; + + PersistRanges(db); + PersistConflicts(db); +} + +void TLockInfo::PersistBrokenLock(ILocksDb* db) { + Y_VERIFY(IsPersistent()); + Y_VERIFY(db, "Cannot persist lock without a db"); + db->PersistLockCounter(LockId, Max<ui64>()); +} + +void TLockInfo::PersistRemoveLock(ILocksDb* db) { + Y_VERIFY(IsPersistent()); + Y_VERIFY(db, "Cannot persist lock without a db"); + + // Remove persistent conflicts + for (auto& pr : ConflictLocks) { + TLockInfo* otherLock = pr.first; + if (otherLock->IsPersistent()) { + if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) { + db->PersistRemoveConflict(LockId, otherLock->LockId); + } + if (!!(pr.second & ELockConflictFlags::BreakUsOnTheirCommit)) { + db->PersistRemoveConflict(otherLock->LockId, LockId); + } + } + otherLock->ConflictLocks.erase(this); + } + ConflictLocks.clear(); + + // Remove persistent ranges + for (auto& range : PersistentRanges) { + db->PersistRemoveRange(LockId, range.Id); + } + PersistentRanges.clear(); + + // Remove the lock itself + db->PersistRemoveLock(LockId); +} + +void TLockInfo::PersistRanges(ILocksDb* db) { + Y_VERIFY(IsPersistent()); + if (UnpersistedRanges) { + for (const TPathId& pathId : ReadTables) { + PersistAddRange(pathId, ELockRangeFlags::Read, db); + } + for (const TPathId& pathId : WriteTables) { + PersistAddRange(pathId, ELockRangeFlags::Write, db); + } + UnpersistedRanges = false; + } +} + +void TLockInfo::PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db) { + Y_VERIFY(IsPersistent()); + Y_VERIFY(db, "Cannot persist ranges without a db"); + // We usually have a single range with flags, so linear search is ok + ui64 maxId = 0; + for (auto& range : PersistentRanges) { + if (range.TableId == tableId) { + auto prevFlags = range.Flags; + range.Flags |= flags; + if (range.Flags != prevFlags) { + db->PersistRangeFlags(LockId, range.Id, ui64(range.Flags)); + } + return; + } + maxId = Max(maxId, range.Id); + } + auto& range = PersistentRanges.emplace_back(); + range.Id = maxId + 1; + range.TableId = tableId; + range.Flags = flags; + db->PersistAddRange(LockId, range.Id, range.TableId, ui64(range.Flags)); +} + +void TLockInfo::AddConflict(TLockInfo* otherLock, ILocksDb* db) { + Y_VERIFY(this != otherLock, "Lock cannot conflict with itself"); + Y_VERIFY(LockId != otherLock->LockId, "Unexpected conflict between a pair of locks with the same id"); + + auto& flags = ConflictLocks[otherLock]; + if (!(flags & ELockConflictFlags::BreakThemOnOurCommit)) { + flags |= ELockConflictFlags::BreakThemOnOurCommit; + auto& otherFlags = otherLock->ConflictLocks[this]; + otherFlags |= ELockConflictFlags::BreakUsOnTheirCommit; + if (IsPersistent() && otherLock->IsPersistent()) { + // Any conflict between persistent locks is also persistent + Y_VERIFY(db, "Cannot persist conflicts without a db"); + db->PersistAddConflict(LockId, otherLock->LockId); + } + } +} + +void TLockInfo::PersistConflicts(ILocksDb* db) { + Y_VERIFY(IsPersistent()); + Y_VERIFY(db, "Cannot persist conflicts without a db"); + for (auto& pr : ConflictLocks) { + TLockInfo* otherLock = pr.first; + if (!otherLock->IsPersistent()) { + // We don't persist non-persistent conflicts + continue; + } + if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) { + db->PersistAddConflict(LockId, otherLock->LockId); + } + if (!!(pr.second & ELockConflictFlags::BreakUsOnTheirCommit)) { + db->PersistAddConflict(otherLock->LockId, LockId); + } + } +} + +void TLockInfo::CleanupConflicts() { + if (IsPersistent()) { + for (auto it = ConflictLocks.begin(); it != ConflictLocks.end();) { + TLockInfo* otherLock = it->first; + if (otherLock->IsPersistent()) { + // We keep persistent conflict in memory until lock is removed + ++it; + } else { + otherLock->ConflictLocks.erase(this); + ConflictLocks.erase(it++); + } + } + } else { + for (auto& pr : ConflictLocks) { + TLockInfo* otherLock = pr.first; + otherLock->ConflictLocks.erase(this); + } + ConflictLocks.clear(); + } +} + +void TLockInfo::RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags) { + auto& range = PersistentRanges.emplace_back(); + range.Id = rangeId; + range.TableId = tableId; + range.Flags = flags; + + if (!!(range.Flags & ELockRangeFlags::Read)) { + if (ReadTables.insert(range.TableId).second) { + ShardLock = true; + if (auto* table = Locker->FindTablePtr(range.TableId)) { + table->AddShardLock(this); + } + } + } + if (!!(range.Flags & ELockRangeFlags::Write)) { + if (WriteTables.insert(range.TableId).second) { + if (auto* table = Locker->FindTablePtr(range.TableId)) { + table->AddWriteLock(this); + } + } + } +} + +void TLockInfo::RestorePersistentConflict(TLockInfo* otherLock) { + Y_VERIFY(IsPersistent() && otherLock->IsPersistent()); + + this->ConflictLocks[otherLock] |= ELockConflictFlags::BreakThemOnOurCommit; + otherLock->ConflictLocks[this] |= ELockConflictFlags::BreakUsOnTheirCommit; } // TTableLocks -void TTableLocks::AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lock) { +void TTableLocks::AddShardLock(TLockInfo* lock) { + ShardLocks.insert(lock); +} + +void TTableLocks::AddPointLock(const TPointKey& point, TLockInfo* lock) { Y_VERIFY(lock->MayHavePointsAndRanges()); Y_VERIFY(point.Table == this); TRangeTreeBase::TOwnedRange added( @@ -71,10 +287,10 @@ void TTableLocks::AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lo true, point.Key, true); - Ranges.AddRange(std::move(added), lock.Get()); + Ranges.AddRange(std::move(added), lock); } -void TTableLocks::AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lock) { +void TTableLocks::AddRangeLock(const TRangeKey& range, TLockInfo* lock) { Y_VERIFY(lock->MayHavePointsAndRanges()); Y_VERIFY(range.Table == this); // FIXME: we have to force empty From/To to be inclusive due to outdated @@ -88,60 +304,100 @@ void TTableLocks::AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lo range.InclusiveFrom || !range.From, range.To, range.InclusiveTo || !range.To); - Ranges.AddRange(std::move(added), lock.Get()); + Ranges.AddRange(std::move(added), lock); } -void TTableLocks::RemoveLock(const TLockInfo::TPtr& lock) { - Ranges.RemoveRanges(lock.Get()); +void TTableLocks::AddWriteLock(TLockInfo* lock) { + WriteLocks.insert(lock); } -void TTableLocks::BreakLocks(TConstArrayRef<TCell> key, const TRowVersion& at) { - Ranges.EachIntersection(key, [&](const TRangeTreeBase::TRange&, TLockInfo* lock) { +void TTableLocks::RemoveReadLock(TLockInfo* lock) { + if (lock->IsShardLock()) { + RemoveShardLock(lock); + } else { + RemoveRangeLock(lock); + } +} + +void TTableLocks::RemoveShardLock(TLockInfo* lock) { + ShardLocks.erase(lock); +} + +void TTableLocks::RemoveRangeLock(TLockInfo* lock) { + Ranges.RemoveRanges(lock); +} + +void TTableLocks::RemoveWriteLock(TLockInfo* lock) { + WriteLocks.erase(lock); +} + +bool TTableLocks::BreakShardLocks(const TRowVersion& at) { + bool broken = false; + for (TLockInfo* lock : ShardLocks) { lock->SetBroken(at); - }); + broken = true; + } + return broken; } -void TTableLocks::BreakAllLocks(const TRowVersion& at) { +bool TTableLocks::BreakAllLocks(const TRowVersion& at) { + bool broken = false; + for (TLockInfo* lock : ShardLocks) { + lock->SetBroken(at); + broken = true; + } Ranges.EachRange([&](const TRangeTreeBase::TRange&, TLockInfo* lock) { lock->SetBroken(at); + broken = true; }); + return broken; } // TLockLocker -TLockInfo::TPtr TLockLocker::AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockTxId, lockNodeId); - if (!lock || lock->IsBroken(at)) - return lock; - - ShardLocks.insert(lockTxId); - for (const TPathId& tableId : lock->GetAffectedTables()) { - Tables.at(tableId)->RemoveLock(lock); +void TLockLocker::AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key) { + if (lock->AddPoint(key)) { + key.Table->AddPointLock(key, lock.Get()); + } else { + key.Table->AddShardLock(lock.Get()); } - lock->AddShardLock(affectedTables); - return lock; } -TLockInfo::TPtr TLockLocker::AddPointLock(ui64 lockId, ui32 lockNodeId, const TPointKey& point, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId); - if (!lock || lock->IsBroken(at)) - return lock; - - if (lock->AddPoint(point)) { - point.Table->AddPointLock(point, lock); +void TLockLocker::AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key) { + if (lock->AddRange(key)) { + key.Table->AddRangeLock(key, lock.Get()); + } else { + key.Table->AddShardLock(lock.Get()); } - return lock; } -TLockInfo::TPtr TLockLocker::AddRangeLock(ui64 lockId, ui32 lockNodeId, const TRangeKey& range, const TRowVersion& at) { - TLockInfo::TPtr lock = GetOrAddLock(lockId, lockNodeId); - if (!lock || lock->IsBroken(at)) - return lock; +void TLockLocker::AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) { + if (!lock->IsShardLock()) { + for (const TPathId& tableId : lock->GetReadTables()) { + Tables.at(tableId)->RemoveRangeLock(lock.Get()); + } + lock->MakeShardLock(); + for (const TPathId& tableId : lock->GetReadTables()) { + Tables.at(tableId)->AddShardLock(lock.Get()); + } + } + for (auto& table : readTables) { + const TPathId& tableId = table.GetTableId(); + Y_VERIFY(Tables.at(tableId).Get() == &table); + if (lock->AddShardLock(tableId)) { + table.AddShardLock(lock.Get()); + } + } +} - if (lock->AddRange(range)) { - range.Table->AddRangeLock(range, lock); +void TLockLocker::AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables) { + for (auto& table : writeTables) { + const TPathId& tableId = table.GetTableId(); + Y_VERIFY(Tables.at(tableId).Get() == &table); + if (lock->AddWriteLock(tableId)) { + table.AddWriteLock(lock.Get()); + } } - return lock; } TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const { @@ -154,30 +410,28 @@ TLockInfo::TPtr TLockLocker::GetLock(ui64 lockTxId, const TRowVersion& at) const return nullptr; } -void TLockLocker::BreakShardLocks(const TRowVersion& at) { - for (ui64 lockId : ShardLocks) { - auto it = Locks.find(lockId); - if (it != Locks.end()) { - it->second->SetBroken(at); - } +void TLockLocker::BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at) { + for (auto& lock : locks) { + lock.SetBroken(at); } - if (!at) - ShardLocks.clear(); + RemoveBrokenRanges(); } -void TLockLocker::BreakLocks(const TPointKey& point, const TRowVersion& at) { - point.Table->BreakLocks(point.Key, at); +void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at) { + for (auto& table : tables) { + table.BreakShardLocks(at); + } + RemoveBrokenRanges(); } -void TLockLocker::BreakAllLocks(const TPathId& pathId, const TRowVersion& at) { - auto it = Tables.find(pathId); - if (it != Tables.end()) { - it->second->BreakAllLocks(at); - RemoveBrokenRanges(); - +void TLockLocker::BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at) { + for (auto& table : tables) { + table.BreakAllLocks(at); } + + RemoveBrokenRanges(); } void TLockLocker::RemoveBrokenRanges() { @@ -186,13 +440,13 @@ void TLockLocker::RemoveBrokenRanges() { if (it != Locks.end()) { const TLockInfo::TPtr& lock = it->second; - if (!lock->IsShardLock()) { - for (const TPathId& tableId : lock->GetAffectedTables()) { - Tables.at(tableId)->RemoveLock(lock); - } - } else { - ShardLocks.erase(lockId); + for (const TPathId& tableId : lock->GetReadTables()) { + Tables.at(tableId)->RemoveReadLock(lock.Get()); } + for (const TPathId& tableId : lock->GetWriteTables()) { + Tables.at(tableId)->RemoveWriteLock(lock.Get()); + } + lock->CleanupConflicts(); } } CleanupPending.clear(); @@ -209,17 +463,24 @@ void TLockLocker::RemoveBrokenRanges() { if (it != Locks.end()) { const TLockInfo::TPtr& lock = it->second; + if (lock->Counter == Max<ui64>()) { + // Skip locks that have been cleaned up already + continue; + } + lock->BreakVersion = TRowVersion::Min(); + lock->Counter = Max<ui64>(); lock->Points.clear(); lock->Ranges.clear(); + ScheduleBrokenLock(lock.Get()); - if (!lock->IsShardLock()) { - for (const TPathId& tableId : lock->GetAffectedTables()) { - Tables.at(tableId)->RemoveLock(lock); - } - } else { - ShardLocks.erase(lockId); + for (const TPathId& tableId : lock->GetReadTables()) { + Tables.at(tableId)->RemoveReadLock(lock.Get()); + } + for (const TPathId& tableId : lock->GetWriteTables()) { + Tables.at(tableId)->RemoveWriteLock(lock.Get()); } + lock->CleanupConflicts(); } } } @@ -246,7 +507,18 @@ TLockInfo::TPtr TLockLocker::GetOrAddLock(ui64 lockId, ui32 lockNodeId) { return lock; } -void TLockLocker::RemoveOneLock(ui64 lockTxId) { +TLockInfo::TPtr TLockLocker::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) { + Y_VERIFY(Locks.find(lockId) == Locks.end()); + + TLockInfo::TPtr lock = Limiter.AddLock(lockId, lockNodeId, generation, counter, createTs); + Locks[lockId] = lock; + if (lockNodeId) { + PendingSubscribeLocks.emplace_back(lockId, lockNodeId); + } + return lock; +} + +void TLockLocker::RemoveOneLock(ui64 lockTxId, ILocksDb* db) { auto it = Locks.find(lockTxId); if (it != Locks.end()) { TLockInfo::TPtr txLock = it->second; @@ -257,44 +529,48 @@ void TLockLocker::RemoveOneLock(ui64 lockTxId) { Self->IncCounter(COUNTER_LOCKS_REMOVED); } - if (!txLock->IsShardLock()) { - for (const TPathId& tableId : txLock->GetAffectedTables()) { - Tables.at(tableId)->RemoveLock(txLock); - } - } else { - ShardLocks.erase(lockTxId); + if (txLock->InBrokenLocks) { + BrokenLocks.Remove(txLock.Get()); + --BrokenLocksCount_; + } + + for (const TPathId& tableId : txLock->GetReadTables()) { + Tables.at(tableId)->RemoveReadLock(txLock.Get()); } + for (const TPathId& tableId : txLock->GetWriteTables()) { + Tables.at(tableId)->RemoveWriteLock(txLock.Get()); + } + txLock->CleanupConflicts(); Limiter.RemoveLock(lockTxId); Locks.erase(it); + if (txLock->IsPersistent()) { + if (db) { + txLock->PersistRemoveLock(db); + } else { + Y_VERIFY(txLock->IsBroken(), "Scheduling persistent lock removal that is not broken"); + RemovedPersistentLocks.push_back(txLock); + } + } } } void TLockLocker::RemoveBrokenLocks() { - for (ui64 lockId : BrokenLocks) { - RemoveOneLock(lockId); - } - BrokenLocks.clear(); - - if (BrokenCandidates.empty()) - return; - - auto till = Self->LastCompleteTxVersion(); - while (!BrokenCandidates.empty() && BrokenCandidates.top().Version <= till) { - RemoveOneLock(BrokenCandidates.top().LockId); - BrokenCandidates.pop(); + RemoveBrokenRanges(); + while (BrokenLocks) { + auto* lock = BrokenLocks.PopFront(); + RemoveOneLock(lock->GetLockId()); } } -void TLockLocker::BreakLock(ui64 lockId, const TRowVersion& at) { - if (auto lock = GetLock(lockId, at)) - lock->SetBroken(at); - - RemoveBrokenLocks(); +void TLockLocker::ForceBreakLock(ui64 lockId) { + if (auto lock = GetLock(lockId, TRowVersion::Min())) { + lock->SetBroken(TRowVersion::Min()); + RemoveBrokenRanges(); + } } -void TLockLocker::RemoveLock(ui64 lockId) { - RemoveBrokenLocks(); - RemoveOneLock(lockId); +void TLockLocker::RemoveLock(ui64 lockId, ILocksDb* db) { + RemoveOneLock(lockId, db); } void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo) { @@ -309,29 +585,49 @@ void TLockLocker::RemoveSchema(const TPathId& tableId) { Y_VERIFY(Tables.empty()); Locks.clear(); ShardLocks.clear(); - BrokenLocks.clear(); + BrokenLocks.Clear(); CleanupPending.clear(); - BrokenCandidates.clear(); CleanupCandidates.clear(); + PendingSubscribeLocks.clear(); + RemovedPersistentLocks.clear(); + Limiter.Clear(); } +bool TLockLocker::ForceShardLock(const TPathId& tableId) const { + auto it = Tables.find(tableId); + if (it != Tables.end()) { + if (it->second->RangeCount() > TLockLimiter::LockLimit()) { + return true; + } + } + return false; +} -bool TLockLocker::ForceShardLock(const THashSet<TPathId>& rangeTables) const { - for (const TPathId& tableId : rangeTables) { - auto it = Tables.find(tableId); - Y_VERIFY(it != Tables.end()); - if (it->second->RangeCount() > TLockLimiter::LockLimit()) +bool TLockLocker::ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const { + for (auto& table : readTables) { + if (table.RangeCount() > TLockLimiter::LockLimit()) return true; } return false; } -void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) { +void TLockLocker::ScheduleBrokenLock(TLockInfo* lock) { + auto it = Locks.find(lock->GetLockId()); + Y_VERIFY(it != Locks.end() && it->second.Get() == lock, + "Sanity check: adding an unknown broken lock"); + if (lock->IsPersistent()) { + BrokenPersistentLocks.PushBack(lock); + } else if (!lock->InBrokenLocks) { + BrokenLocks.PushBack(lock); + ++BrokenLocksCount_; + lock->InBrokenLocks = true; + } +} + +void TLockLocker::ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at) { if (at) { - BrokenCandidates.emplace(lockId, at); CleanupCandidates.emplace(lockId, at); } else { - BrokenLocks.push_back(lockId); CleanupPending.push_back(lockId); } @@ -340,8 +636,15 @@ void TLockLocker::ScheduleLockCleanup(ui64 lockId, const TRowVersion& at) { } } -void TLockLocker::RemoveSubscribedLock(ui64 lockId) { - RemoveLock(lockId); +void TLockLocker::RemoveSubscribedLock(ui64 lockId, ILocksDb* db) { + RemoveLock(lockId, db); +} + +void TLockLocker::SaveBrokenPersistentLocks(ILocksDb* db) { + while (BrokenPersistentLocks) { + TLockInfo* lock = BrokenPersistentLocks.PopFront(); + lock->PersistBrokenLock(db); + } } // TLockLocker.TLockLimiter @@ -379,6 +682,15 @@ void TLockLocker::TLockLimiter::TouchLock(ui64 lockId) { LocksQueue.Find(lockId); } +TLockInfo::TPtr TLockLocker::TLockLimiter::AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs) { + LocksQueue.Insert(lockId, createTs); + return TLockInfo::TPtr(new TLockInfo(Parent, lockId, lockNodeId, generation, counter, createTs)); +} + +void TLockLocker::TLockLimiter::Clear() { + LocksQueue.Clear(); +} + // TSysLocks TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { @@ -386,101 +698,132 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { TMicrosecTimerCounter measureApplyLocks(*Self, COUNTER_APPLY_LOCKS_USEC); - auto &checkVersion = Update->CheckVersion; - auto &breakVersion = Update->BreakVersion; + // Note: we don't use CheckVersion here, because ApplyLocks is all about + // setting locks, not validating them. If the lock is broken in any + // version, then extending it is pointless: validation would be at + // some point in the future, where it is broken already. + TRowVersion breakVersion = Update->BreakVersion; + + // TODO: move this somewhere earlier, like the start of a new update guard + Locker.RemoveBrokenRanges(); - if (Update->ShardBreak) { - Locker.BreakShardLocks(breakVersion); + if (Update->BreakLocks) { + Locker.BreakLocks(Update->BreakLocks, breakVersion); } - if (Update->PointBreaks.size()) { - for (const auto& key : Update->PointBreaks) { - Locker.BreakLocks(key, breakVersion); - } + if (Update->BreakShardLocks) { + Locker.BreakLocks(Update->BreakShardLocks, breakVersion); } - if (Update->AllBreaks.size()) { - for (const auto& pathId : Update->AllBreaks) { - Locker.BreakAllLocks(pathId, breakVersion); - } + if (Update->BreakAllLocks) { + Locker.BreakLocks(Update->BreakAllLocks, breakVersion); } - if (!Update->Erases.empty() && Self->TabletCounters) { - Self->IncCounter(COUNTER_LOCKS_ERASED, Update->Erases.size()); + Locker.SaveBrokenPersistentLocks(Db); + + // Merge shard lock conflicts into write conflicts, we do this once as an optimization + for (auto& table : Update->WriteConflictShardLocks) { + for (auto* lock : table.ShardLocks) { + if (lock->GetLockId() != Update->LockTxId) { + Update->WriteConflictLocks.PushBack(lock); + } + } } - for (ui64 lockId : Update->Erases) { + size_t erases = 0; + while (Update->EraseLocks) { Y_VERIFY(!Update->HasLocks(), "Can't erase and set locks in one Tx"); - if (breakVersion) - Locker.BreakLock(lockId, breakVersion); - else - Locker.RemoveLock(lockId); + auto* lock = Update->EraseLocks.PopFront(); + Locker.RemoveLock(lock->GetLockId(), Db); + ++erases; + } + + if (erases > 0 && Self->TabletCounters) { + Self->IncCounter(COUNTER_LOCKS_ERASED, erases); } - if (!Update->HasLocks()) + if (!Update->HasLocks()) { + // Adding read/write conflicts implies locking + Y_VERIFY(!Update->ReadConflictLocks); + Y_VERIFY(!Update->WriteConflictLocks); return TVector<TLock>(); + } - if (Locker.ForceShardLock(Update->RangeTables)) - Update->ShardLock = true; + bool shardLock = Locker.ForceShardLock(Update->ReadTables); + TLockInfo::TPtr lock; ui64 counter = TLock::ErrorNotSet; - ui32 numNotSet = 0; + if (Update->BreakOwn) { counter = TLock::ErrorAlreadyBroken; - } else if (Update->ShardLock) { - TLockInfo::TPtr lock = Locker.AddShardLock(Update->LockTxId, Update->LockNodeId, Update->AffectedTables, checkVersion); - if (lock) { - Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); - counter = lock->GetCounter(checkVersion); - } else { - ++numNotSet; - } + Locker.ForceBreakLock(Update->LockTxId); + Locker.SaveBrokenPersistentLocks(Db); } else { - for (const auto& key : Update->PointLocks) { - TLockInfo::TPtr lock = Locker.AddPointLock(Update->LockTxId, Update->LockNodeId, key, checkVersion); - if (lock) { - Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); - counter = lock->GetCounter(checkVersion); + lock = Locker.GetOrAddLock(Update->LockTxId, Update->LockNodeId); + if (!lock) { + counter = TLock::ErrorTooMuch; + } else if (lock->IsBroken()) { + counter = TLock::ErrorBroken; + } else { + if (shardLock) { + Locker.AddShardLock(lock, Update->ReadTables); + if (Self->TabletCounters) { + Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD); + } } else { - ++numNotSet; + for (const auto& key : Update->PointLocks) { + Locker.AddPointLock(lock, key); + } + for (const auto& key : Update->RangeLocks) { + Locker.AddRangeLock(lock, key); + } } - } + if (Update->WriteTables) { + Locker.AddWriteLock(lock, Update->WriteTables); + } + counter = lock->GetCounter(); + Update->Lock = lock; - for (const auto& key : Update->RangeLocks) { - TLockInfo::TPtr lock = Locker.AddRangeLock(Update->LockTxId, Update->LockNodeId, key, checkVersion); - if (lock) { - Y_VERIFY(counter == lock->GetCounter(checkVersion) || TLock::IsNotSet(counter)); - counter = lock->GetCounter(checkVersion); - } else { - ++numNotSet; + if (lock->IsPersistent()) { + lock->PersistRanges(Db); + } + for (auto& readConflictLock : Update->ReadConflictLocks) { + readConflictLock.AddConflict(lock.Get(), Db); + } + for (auto& writeConflictLock : Update->WriteConflictLocks) { + lock->AddConflict(&writeConflictLock, Db); } - } - } - if (numNotSet) { - counter = TLock::ErrorTooMuch; + if (lock->GetWriteTables() && !lock->IsPersistent()) { + // We need to persist a new lock + lock->PersistLock(Db); + } + } } UpdateCounters(counter); // We have to tell client that there were some locks (even if we don't set them) TVector<TLock> out; - out.reserve(Update->AffectedTables.size()); - for (const TPathId& pathId : Update->AffectedTables) { - out.emplace_back(MakeLock(Update->LockTxId, counter, pathId)); + for (auto& table : Update->AffectedTables) { + out.emplace_back(MakeLock(Update->LockTxId, lock ? lock->GetGeneration() : Self->Generation(), counter, table.GetTableId())); } return out; } -void TSysLocks::UpdateCounters(ui64 counter) { +void TSysLocks::UpdateCounters() { if (!Self->TabletCounters) return; Self->IncCounter(COUNTER_LOCKS_ACTIVE_PER_SHARD, LocksCount()); Self->IncCounter(COUNTER_LOCKS_BROKEN_PER_SHARD, BrokenLocksCount()); - if (Update && Update->ShardLock) { - Self->IncCounter(COUNTER_LOCKS_WHOLE_SHARD); - } +} + +void TSysLocks::UpdateCounters(ui64 counter) { + if (!Self->TabletCounters) + return; + + UpdateCounters(); if (TLock::IsError(counter)) { if (TLock::IsBroken(counter)) { @@ -519,17 +862,17 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { auto &checkVersion = Update->CheckVersion; TLockInfo::TPtr txLock = Locker.GetLock(lockTxId, checkVersion); if (txLock) { - const auto& tableIds = txLock->GetAffectedTables(); + const auto& tableIds = txLock->GetReadTables(); if (key.size() == 2) { // locks v1 Y_VERIFY(tableIds.size() == 1); - return MakeAndLogLock(lockTxId, txLock->GetCounter(checkVersion), *tableIds.begin()); + return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), *tableIds.begin()); } else { // locks v2 Y_VERIFY(key.size() == 4); TPathId tableId; ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::SchemeShard, tableId.OwnerId); ok = ok && TLocksTable::ExtractKey(key, TLocksTable::EColumns::PathId, tableId.LocalPathId); if (ok && tableId && tableIds.contains(tableId)) - return MakeAndLogLock(lockTxId, txLock->GetCounter(checkVersion), tableId); + return MakeAndLogLock(lockTxId, txLock->GetGeneration(), txLock->GetCounter(checkVersion), tableId); } } @@ -539,7 +882,9 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) { Y_VERIFY(Update); - Update->EraseLock(GetLockId(key)); + if (auto* lock = Locker.FindLockPtr(GetLockId(key))) { + Update->AddEraseLock(lock); + } } void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { @@ -549,7 +894,7 @@ void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef<const TCell>& k if (lockTxId) { Y_VERIFY(Update); - Update->SetLock(tableId, Locker.MakePoint(tableId, key), lockTxId, lockNodeId); + Update->AddPointLock(Locker.MakePoint(tableId, key), lockTxId, lockNodeId); } } @@ -565,18 +910,82 @@ void TSysLocks::SetLock(const TTableId& tableId, const TTableRange& range, ui64 if (lockTxId) { Y_VERIFY(Update); - Update->SetLock(tableId, Locker.MakeRange(tableId, range), lockTxId, lockNodeId); + Update->AddRangeLock(Locker.MakeRange(tableId, range), lockTxId, lockNodeId); } } -void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) { - Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); +void TSysLocks::SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { + Y_VERIFY(!TSysTables::IsSystemTable(tableId)); if (!Self->IsUserTable(tableId)) return; - if (Locker.TableHasLocks(tableId)) - Update->BreakLocks(Locker.MakePoint(tableId, key)); - Update->BreakShardLock(); + if (auto* table = Locker.FindTablePtr(tableId)) { + Update->AddWriteLock(table, lockTxId, lockNodeId); + AddWriteConflict(tableId, key, lockTxId, lockNodeId); + } +} + +void TSysLocks::BreakLock(ui64 lockId) { + if (auto* lock = Locker.FindLockPtr(lockId)) { + Update->AddBreakLock(lock); + } +} + +void TSysLocks::BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key) { + Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); + + if (auto* table = Locker.FindTablePtr(tableId)) { + if (table->HasRangeLocks()) { + // Note: avoid copying the key, find all locks here + table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) { + update->AddBreakLock(lock); + }); + } + if (table->HasShardLocks()) { + // We also want to break all shard locks in this table + Update->AddBreakShardLocks(table); + } + } +} + +void TSysLocks::AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) { + Y_UNUSED(lockNodeId); + + if (conflictId != lockTxId) { + if (auto* lock = Locker.FindLockPtr(conflictId)) { + Update->AddReadConflictLock(lock); + } + } +} + +void TSysLocks::AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId) { + Y_UNUSED(lockNodeId); + + if (conflictId != lockTxId) { + if (auto* lock = Locker.FindLockPtr(conflictId)) { + Update->AddWriteConflictLock(lock); + } + } +} + +void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId) { + Y_UNUSED(lockTxId); + Y_UNUSED(lockNodeId); + + if (auto* table = Locker.FindTablePtr(tableId)) { + if (table->HasRangeLocks()) { + // Note: avoid copying the key, find all locks here + table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) { + if (lock->GetLockId() != update->LockTxId) { + update->AddWriteConflictLock(lock); + } + }); + } + if (table->HasShardLocks()) { + // We also want to conflict with all shard locks in this table + Update->AddWriteConflictShardLocks(table); + } + } } void TSysLocks::BreakAllLocks(const TTableId& tableId) { @@ -584,9 +993,11 @@ void TSysLocks::BreakAllLocks(const TTableId& tableId) { if (!Self->IsUserTable(tableId)) return; - if (Locker.TableHasLocks(tableId)) - Update->BreakAllLocks(tableId); - Update->BreakShardLock(); + if (auto* table = Locker.FindTablePtr(tableId)) { + if (table->HasRangeLocks() || table->HasShardLocks()) { + Update->AddBreakAllLocks(table); + } + } } void TSysLocks::BreakSetLocks(ui64 lockTxId, ui32 lockNodeId) { @@ -602,23 +1013,77 @@ bool TSysLocks::IsMyKey(const TArrayRef<const TCell>& key) const { return ok && (Self->TabletID() == tabletId); } -TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const { +bool TSysLocks::HasWriteLock(ui64 lockId, const TTableId& tableId) const { + if (auto* lock = Locker.FindLockPtr(lockId)) { + return lock->WriteTables.contains(tableId.PathId); + } + + return false; +} + +bool TSysLocks::HasWriteLocks(const TTableId& tableId) const { + if (auto* table = Locker.FindTablePtr(tableId.PathId)) { + return !table->WriteLocks.empty(); + } + + return false; +} + +bool TSysLocks::MayAddLock(ui64 lockId) const { + if (auto* lock = Locker.FindLockPtr(lockId)) { + // We may expand the lock unless it's broken + return !lock->IsBroken(); + } + + Y_VERIFY(Db, "MayAddLock needs a valid locks database"); + return Db->MayAddLock(lockId); +} + +TSysLocks::TLock TSysLocks::MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const { TLock lock; lock.LockId = lockTxId; lock.DataShard = Self->TabletID(); - lock.Generation = Self->Generation(); + lock.Generation = generation; lock.Counter = counter; lock.SchemeShard = pathId.OwnerId; lock.PathId = pathId.LocalPathId; return lock; } -TSysLocks::TLock TSysLocks::MakeAndLogLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const { - TLock lock = MakeLock(lockTxId, counter, pathId); +TSysLocks::TLock TSysLocks::MakeAndLogLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const { + TLock lock = MakeLock(lockTxId, generation, counter, pathId); if (AccessLog) AccessLog->Locks[lockTxId] = lock; return lock; } +bool TSysLocks::Load(ILocksDb& db) { + TVector<ILocksDb::TLockRow> rows; + if (!db.Load(rows)) { + return false; + } + + Locker.Clear(); + + for (auto& lockRow : rows) { + TLockInfo::TPtr lock = Locker.AddLock(lockRow.LockId, lockRow.LockNodeId, lockRow.Generation, lockRow.Counter, TInstant::MicroSeconds(lockRow.CreateTs)); + for (auto& rangeRow : lockRow.Ranges) { + lock->RestorePersistentRange(rangeRow.RangeId, rangeRow.TableId, ELockRangeFlags(rangeRow.Flags)); + } + } + + for (auto& lockRow : rows) { + auto* lock = Locker.FindLockPtr(lockRow.LockId); + Y_VERIFY(lock); + for (ui64 conflictId : lockRow.Conflicts) { + if (auto* otherLock = Locker.FindLockPtr(conflictId)) { + lock->RestorePersistentConflict(otherLock); + } + } + } + + return true; +} + }} diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 8217e6cbbde..6cb67bfe8cc 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -20,6 +20,53 @@ namespace NDataShard { struct TUserTable; +class ILocksDb { +protected: + ~ILocksDb() = default; + +public: + struct TLockRange { + ui64 RangeId; + TPathId TableId; + ui64 Flags; + TString Data; + }; + + struct TLockRow { + ui64 LockId; + ui32 LockNodeId; + ui32 Generation; + ui64 Counter; + ui64 CreateTs; + ui64 Flags; + + TVector<TLockRange> Ranges; + TVector<ui64> Conflicts; + }; + + virtual bool Load(TVector<TLockRow>& rows) = 0; + + // Returns true when a new lock may be added with the given lockId + // Sometimes new lock cannot be added, e.g. when it had uncommitted changes + // in the past, and adding anything with the same lockId would conflict + // with previous decisions. + virtual bool MayAddLock(ui64 lockId) = 0; + + // Persist adding/removing a lock info + virtual void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) = 0; + virtual void PersistLockCounter(ui64 lockId, ui64 counter) = 0; + virtual void PersistRemoveLock(ui64 lockId) = 0; + + // Persist adding/removing info on locked ranges + virtual void PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags = 0, const TString& data = {}) = 0; + virtual void PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) = 0; + virtual void PersistRemoveRange(ui64 lockId, ui64 rangeId) = 0; + + // Persist a conflict, i.e. this lock must break some other lock on commit + virtual void PersistAddConflict(ui64 lockId, ui64 otherLockId) = 0; + virtual void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) = 0; +}; + class TLocksDataShard { public: TLocksDataShard(TTabletCountersBase* const &tabletCounters) @@ -99,6 +146,7 @@ private: class TLockInfo; class TTableLocks; class TLockLocker; +class TSysLocks; /// struct TPointKey { @@ -152,23 +200,73 @@ struct TPendingSubscribeLock { } }; +enum class ELockConflictFlags : ui8 { + None = 0, + BreakThemOnOurCommit = 1, + BreakUsOnTheirCommit = 2, +}; + +using ELockConflictFlagsRaw = std::underlying_type<ELockConflictFlags>::type; + +inline ELockConflictFlags operator|(ELockConflictFlags a, ELockConflictFlags b) { return ELockConflictFlags(ELockConflictFlagsRaw(a) | ELockConflictFlagsRaw(b)); } +inline ELockConflictFlags operator&(ELockConflictFlags a, ELockConflictFlags b) { return ELockConflictFlags(ELockConflictFlagsRaw(a) & ELockConflictFlagsRaw(b)); } +inline ELockConflictFlags& operator|=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a | b; } +inline ELockConflictFlags& operator&=(ELockConflictFlags& a, ELockConflictFlags b) { return a = a & b; } +inline bool operator!(ELockConflictFlags c) { return ELockConflictFlagsRaw(c) == 0; } + +enum class ELockRangeFlags : ui8 { + None = 0, + Read = 1, + Write = 2, +}; + +using ELockRangeFlagsRaw = std::underlying_type<ELockRangeFlags>::type; + +inline ELockRangeFlags operator|(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); } +inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); } +inline ELockRangeFlags& operator|=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a | b; } +inline ELockRangeFlags& operator&=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a & b; } +inline bool operator!(ELockRangeFlags c) { return ELockRangeFlagsRaw(c) == 0; } + +// Tags for various intrusive lists +struct TLockInfoBreakListTag {}; +struct TLockInfoEraseListTag {}; +struct TLockInfoReadConflictListTag {}; +struct TLockInfoWriteConflictListTag {}; +struct TLockInfoBrokenListTag {}; +struct TLockInfoBrokenPersistentListTag {}; + /// Aggregates shard, point and range locks -class TLockInfo : public TSimpleRefCount<TLockInfo> { +class TLockInfo + : public TSimpleRefCount<TLockInfo> + , public TIntrusiveListItem<TLockInfo, TLockInfoBreakListTag> + , public TIntrusiveListItem<TLockInfo, TLockInfoEraseListTag> + , public TIntrusiveListItem<TLockInfo, TLockInfoReadConflictListTag> + , public TIntrusiveListItem<TLockInfo, TLockInfoWriteConflictListTag> + , public TIntrusiveListItem<TLockInfo, TLockInfoBrokenListTag> + , public TIntrusiveListItem<TLockInfo, TLockInfoBrokenPersistentListTag> +{ friend class TTableLocks; friend class TLockLocker; + friend class TSysLocks; public: using TPtr = TIntrusivePtr<TLockInfo>; TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId); + TLockInfo(TLockLocker * locker, ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); ~TLockInfo(); + ui32 GetGeneration() const { return Generation; } ui64 GetCounter(const TRowVersion& at = TRowVersion::Max()) const { return !BreakVersion || at < *BreakVersion ? Counter : Max<ui64>(); } bool IsBroken(const TRowVersion& at = TRowVersion::Max()) const { return GetCounter(at) == Max<ui64>(); } size_t NumPoints() const { return Points.size(); } size_t NumRanges() const { return Ranges.size(); } bool IsShardLock() const { return ShardLock; } + bool IsWriteLock() const { return !WriteTables.empty(); } + bool IsPersistent() const { return Persistent; } + bool HasUnpersistedRanges() const { return UnpersistedRanges; } //ui64 MemorySize() const { return 1; } // TODO bool MayHavePointsAndRanges() const { return !ShardLock && (!BreakVersion || *BreakVersion); } @@ -177,33 +275,84 @@ public: ui32 GetLockNodeId() const { return LockNodeId; } TInstant GetCreationTime() const { return CreationTime; } - const THashSet<TPathId>& GetAffectedTables() const { return AffectedTables; } + const THashSet<TPathId>& GetReadTables() const { return ReadTables; } + const THashSet<TPathId>& GetWriteTables() const { return WriteTables; } const TVector<TPointKey>& GetPoints() const { return Points; } const TVector<TRangeKey>& GetRanges() const { return Ranges; } + void PersistLock(ILocksDb* db); + void PersistBrokenLock(ILocksDb* db); + void PersistRemoveLock(ILocksDb* db); + + void PersistRanges(ILocksDb* db); + + void AddConflict(TLockInfo* otherLock, ILocksDb* db); + void PersistConflicts(ILocksDb* db); + void CleanupConflicts(); + + void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags); + void RestorePersistentConflict(TLockInfo* otherLock); + private: - void AddShardLock(const THashSet<TPathId>& affectedTables); + void MakeShardLock(); + bool AddShardLock(const TPathId& pathId); bool AddPoint(const TPointKey& point); bool AddRange(const TRangeKey& range); - void SetBroken(const TRowVersion& at); + bool AddWriteLock(const TPathId& pathId); + void SetBroken(TRowVersion at); + + void PersistAddRange(const TPathId& tableId, ELockRangeFlags flags, ILocksDb* db); + +private: + struct TPersistentRange { + ui64 Id; + TPathId TableId; + ELockRangeFlags Flags; + }; private: TLockLocker * Locker; ui64 LockId; ui32 LockNodeId; + ui32 Generation; ui64 Counter; TInstant CreationTime; - THashSet<TPathId> AffectedTables; + THashSet<TPathId> ReadTables; + THashSet<TPathId> WriteTables; TVector<TPointKey> Points; TVector<TRangeKey> Ranges; bool ShardLock = false; + bool Persistent = false; + bool UnpersistedRanges = false; + bool InBrokenLocks = false; std::optional<TRowVersion> BreakVersion; + + // A set of locks we must break on commit + THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks; + TVector<TPersistentRange> PersistentRanges; }; +struct TTableLocksReadListTag {}; +struct TTableLocksWriteListTag {}; +struct TTableLocksAffectedListTag {}; +struct TTableLocksBreakShardListTag {}; +struct TTableLocksBreakAllListTag {}; +struct TTableLocksWriteConflictShardListTag {}; + /// -class TTableLocks : public TSimpleRefCount<TTableLocks> { +class TTableLocks + : public TSimpleRefCount<TTableLocks> + , public TIntrusiveListItem<TTableLocks, TTableLocksReadListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksWriteListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksAffectedListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksBreakShardListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksBreakAllListTag> + , public TIntrusiveListItem<TTableLocks, TTableLocksWriteConflictShardListTag> +{ + friend class TSysLocks; + public: using TPtr = TIntrusivePtr<TTableLocks>; @@ -215,11 +364,16 @@ public: TPathId GetTableId() const { return TableId; } - void AddPointLock(const TPointKey& point, const TLockInfo::TPtr& lock); - void AddRangeLock(const TRangeKey& range, const TLockInfo::TPtr& lock); - void RemoveLock(const TLockInfo::TPtr& lock); - void BreakLocks(TConstArrayRef<TCell> key, const TRowVersion& at); - void BreakAllLocks(const TRowVersion& at); + void AddShardLock(TLockInfo* lock); + void AddPointLock(const TPointKey& point, TLockInfo* lock); + void AddRangeLock(const TRangeKey& range, TLockInfo* lock); + void AddWriteLock(TLockInfo* lock); + void RemoveReadLock(TLockInfo* lock); + void RemoveShardLock(TLockInfo* lock); + void RemoveRangeLock(TLockInfo* lock); + void RemoveWriteLock(TLockInfo* lock); + bool BreakShardLocks(const TRowVersion& at); + bool BreakAllLocks(const TRowVersion& at); ui64 NumKeyColumns() const { return KeyColumnTypes.size(); @@ -238,21 +392,28 @@ public: } } - bool HasLocks() const { return Ranges.Size() > 0; } + bool HasShardLocks() const { return !ShardLocks.empty(); } + bool HasRangeLocks() const { return Ranges.Size() > 0; } ui64 RangeCount() const { return Ranges.Size(); } void Clear() { Ranges.Clear(); + ShardLocks.clear(); + WriteLocks.clear(); } private: const TPathId TableId; TVector<NScheme::TTypeId> KeyColumnTypes; TRangeTreap<TLockInfo*> Ranges; + THashSet<TLockInfo*> ShardLocks; + THashSet<TLockInfo*> WriteLocks; }; /// Owns and manages locks class TLockLocker { + friend class TSysLocks; + public: /// Prevent unlimited lock's count growth class TLockLimiter { @@ -279,6 +440,9 @@ public: void RemoveLock(ui64 lockId); void TouchLock(ui64 lockId); + TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); + void Clear(); + // TODO: AddPoint, AddRange private: @@ -299,25 +463,45 @@ public: Tables.clear(); } - TLockInfo::TPtr AddShardLock(ui64 lockTxId, ui32 lockNodeId, const THashSet<TPathId>& affectedTables, const TRowVersion& at); - TLockInfo::TPtr AddPointLock(ui64 lockTxId, ui32 lockNodeId, const TPointKey& key, const TRowVersion& at); - TLockInfo::TPtr AddRangeLock(ui64 lockTxId, ui32 lockNodeId, const TRangeKey& key, const TRowVersion& at); + void AddPointLock(const TLockInfo::TPtr& lock, const TPointKey& key); + void AddRangeLock(const TLockInfo::TPtr& lock, const TRangeKey& key); + void AddShardLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables); + void AddWriteLock(const TLockInfo::TPtr& lock, TIntrusiveList<TTableLocks, TTableLocksWriteListTag>& writeTables); + TLockInfo::TPtr GetLock(ui64 lockTxId, const TRowVersion& at) const; ui64 LocksCount() const { return Locks.size(); } - ui64 BrokenLocksCount() const { return BrokenLocks.size() + BrokenCandidates.size(); } + ui64 BrokenLocksCount() const { return BrokenLocksCount_; } + + void BreakLocks(TIntrusiveList<TLockInfo, TLockInfoBreakListTag>& locks, const TRowVersion& at); + void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag>& tables, const TRowVersion& at); + void BreakLocks(TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag>& tables, const TRowVersion& at); + void ForceBreakLock(ui64 lockId); + void RemoveLock(ui64 lockTxId, ILocksDb* db); + + TLockInfo* FindLockPtr(ui64 lockId) const { + auto it = Locks.find(lockId); + if (it != Locks.end()) { + return it->second.Get(); + } else { + return nullptr; + } + } - void BreakShardLocks(const TRowVersion& at); - void BreakLocks(const TPointKey& point, const TRowVersion& at); - void BreakAllLocks(const TPathId& pathId, const TRowVersion& at); - void BreakLock(ui64 lockTxId, const TRowVersion& at); - void RemoveLock(ui64 lockTxId); + TTableLocks* FindTablePtr(const TTableId& tableId) const { + auto it = Tables.find(tableId.PathId); + if (it != Tables.end()) { + return it->second.Get(); + } else { + return nullptr; + } + } - bool TableHasLocks(const TTableId& tableId) const { + bool TableHasRangeLocks(const TTableId& tableId) const { auto it = Tables.find(tableId.PathId); if (it == Tables.end()) return false; - return it->second->HasLocks(); + return it->second->HasRangeLocks(); } TPointKey MakePoint(const TTableId& tableId, TConstArrayRef<TCell> point) const { @@ -340,10 +524,11 @@ public: void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo); void RemoveSchema(const TPathId& tableId); - bool ForceShardLock(const THashSet<TPathId>& rangeTables) const; + bool ForceShardLock(const TPathId& tableId) const; + bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const; - // optimisation: set to remove broken lock at next Remove() - void ScheduleLockCleanup(ui64 lockId, const TRowVersion& at); + void ScheduleBrokenLock(TLockInfo* lock); + void ScheduleRemoveBrokenRanges(ui64 lockId, const TRowVersion& at); TPendingSubscribeLock NextPendingSubscribeLock() { TPendingSubscribeLock result; @@ -354,20 +539,43 @@ public: return result; } - void RemoveSubscribedLock(ui64 lockId); + void RemoveSubscribedLock(ui64 lockId, ILocksDb* db); + ui32 Generation() const { return Self->Generation(); } ui64 IncCounter() { return Counter++; }; + TVector<TLockInfo::TPtr>& GetRemovedPersistentLocks() { + return RemovedPersistentLocks; + } + + void Clear() { + for (auto& pr : Tables) { + pr.second->Clear(); + } + Locks.clear(); + ShardLocks.clear(); + BrokenLocks.Clear(); + CleanupPending.clear(); + CleanupCandidates.clear(); + PendingSubscribeLocks.clear(); + RemovedPersistentLocks.clear(); + Limiter.Clear(); + } + private: THolder<TLocksDataShard> Self; THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId THashMap<TPathId, TTableLocks::TPtr> Tables; THashSet<ui64> ShardLocks; - TVector<ui64> BrokenLocks; // LockIds of broken locks (optimisation) - TVector<ui64> CleanupPending; // LockIds of broken locks with pending cleanup - TPriorityQueue<TVersionedLockId> BrokenCandidates; + // A list of broken, but not yet removed locks + TIntrusiveList<TLockInfo, TLockInfoBrokenPersistentListTag> BrokenPersistentLocks; + TIntrusiveList<TLockInfo, TLockInfoBrokenListTag> BrokenLocks; + size_t BrokenLocksCount_ = 0; + // A queue of locks that need their ranges to be cleaned up + TVector<ui64> CleanupPending; TPriorityQueue<TVersionedLockId> CleanupCandidates; TList<TPendingSubscribeLock> PendingSubscribeLocks; + TVector<TLockInfo::TPtr> RemovedPersistentLocks; TLockLimiter Limiter; ui64 Counter; @@ -380,71 +588,91 @@ private: void RemoveBrokenRanges(); TLockInfo::TPtr GetOrAddLock(ui64 lockId, ui32 lockNodeId); - void RemoveOneLock(ui64 lockId); + TLockInfo::TPtr AddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, TInstant createTs); + void RemoveOneLock(ui64 lockId, ILocksDb* db = nullptr); void RemoveBrokenLocks(); + + void SaveBrokenPersistentLocks(ILocksDb* db); }; /// A portion of locks update struct TLocksUpdate { ui64 LockTxId = 0; ui32 LockNodeId = 0; - TVector<TPointKey> PointLocks; - TVector<TRangeKey> RangeLocks; - TVector<TPointKey> PointBreaks; - THashSet<TPathId> AllBreaks; - TVector<ui64> Erases; - bool ShardLock = false; - bool ShardBreak = false; - THashSet<TPathId> AffectedTables; - THashSet<TPathId> RangeTables; + TLockInfo::TPtr Lock; + + TStackVec<TPointKey, 4> PointLocks; + TStackVec<TRangeKey, 4> RangeLocks; + + TIntrusiveList<TTableLocks, TTableLocksReadListTag> ReadTables; + TIntrusiveList<TTableLocks, TTableLocksWriteListTag> WriteTables; + TIntrusiveList<TTableLocks, TTableLocksAffectedListTag> AffectedTables; + + TIntrusiveList<TLockInfo, TLockInfoBreakListTag> BreakLocks; + TIntrusiveList<TTableLocks, TTableLocksBreakShardListTag> BreakShardLocks; + TIntrusiveList<TTableLocks, TTableLocksBreakAllListTag> BreakAllLocks; + + TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks; + TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks; + TIntrusiveList<TTableLocks, TTableLocksWriteConflictShardListTag> WriteConflictShardLocks; + + TIntrusiveList<TLockInfo, TLockInfoEraseListTag> EraseLocks; TRowVersion CheckVersion = TRowVersion::Max(); TRowVersion BreakVersion = TRowVersion::Min(); bool BreakOwn = false; - void Clear() { - LockTxId = 0; - LockNodeId = 0; - ShardLock = false; - ShardBreak = false; - PointLocks.clear(); - PointBreaks.clear(); - AllBreaks.clear(); - Erases.clear(); - } - bool HasLocks() const { - return ShardLock || PointLocks.size() || RangeLocks.size(); + return bool(AffectedTables); } - void SetLock(const TTableId& tableId, const TRangeKey& range, ui64 lockId, ui32 lockNodeId) { + void AddRangeLock(const TRangeKey& range, ui64 lockId, ui32 lockNodeId) { Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); - AffectedTables.insert(tableId.PathId); - RangeTables.insert(tableId.PathId); + ReadTables.PushBack(range.Table.Get()); + AffectedTables.PushBack(range.Table.Get()); RangeLocks.push_back(range); } - void SetLock(const TTableId& tableId, const TPointKey& key, ui64 lockId, ui32 lockNodeId) { + void AddPointLock(const TPointKey& key, ui64 lockId, ui32 lockNodeId) { Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); - AffectedTables.insert(tableId.PathId); + ReadTables.PushBack(key.Table.Get()); + AffectedTables.PushBack(key.Table.Get()); PointLocks.push_back(key); } - void BreakLocks(const TPointKey& key) { - PointBreaks.push_back(key); + void AddWriteLock(TTableLocks* table, ui64 lockId, ui32 lockNodeId) { + Y_VERIFY(LockTxId == lockId && LockNodeId == lockNodeId); + WriteTables.PushBack(table); + AffectedTables.PushBack(table); + } + + void AddBreakLock(TLockInfo* lock) { + BreakLocks.PushBack(lock); } - void BreakAllLocks(const TTableId& tableId) { - AllBreaks.insert(tableId.PathId); + void AddBreakShardLocks(TTableLocks* table) { + BreakShardLocks.PushBack(table); } - void BreakShardLock() { - ShardBreak = true; + void AddBreakAllLocks(TTableLocks* table) { + BreakAllLocks.PushBack(table); } - void EraseLock(ui64 lockId) { - Erases.push_back(lockId); + void AddReadConflictLock(TLockInfo* lock) { + ReadConflictLocks.PushBack(lock); + } + + void AddWriteConflictLock(TLockInfo* lock) { + WriteConflictLocks.PushBack(lock); + } + + void AddWriteConflictShardLocks(TTableLocks* table) { + WriteConflictShardLocks.PushBack(table); + } + + void AddEraseLock(TLockInfo* lock) { + EraseLocks.PushBack(lock); } void BreakSetLocks(ui64 lockId, ui32 lockNodeId) { @@ -467,23 +695,24 @@ public: TSysLocks(const T * self) : Self(new TLocksDataShardAdapter<T>(self)) , Locker(self) - , Update(nullptr) - , AccessLog(nullptr) - , Cache(nullptr) {} - void SetTxUpdater(TLocksUpdate * up) { + void SetTxUpdater(TLocksUpdate* up) { Update = up; } - void SetAccessLog(TLocksCache *log) { + void SetAccessLog(TLocksCache* log) { AccessLog = log; } - void SetCache(TLocksCache *cache) { + void SetCache(TLocksCache* cache) { Cache = cache; } + void SetDb(ILocksDb* db) { + Db = db; + } + ui64 CurrentLockTxId() const { Y_VERIFY(Update); return Update->LockTxId; @@ -503,10 +732,18 @@ public: void EraseLock(const TArrayRef<const TCell>& syslockKey); void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); void SetLock(const TTableId& tableId, const TTableRange& range, ui64 lockTxId, ui32 lockNodeId); + void SetWriteLock(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); + void BreakLock(ui64 lockId); void BreakLock(const TTableId& tableId, const TArrayRef<const TCell>& key); + void AddReadConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId); + void AddWriteConflict(ui64 conflictId, ui64 lockTxId, ui32 lockNodeId); + void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key, ui64 lockTxId, ui32 lockNodeId); void BreakAllLocks(const TTableId& tableId); void BreakSetLocks(ui64 lockTxId, ui32 lockNodeId); bool IsMyKey(const TArrayRef<const TCell>& key) const; + bool HasWriteLock(ui64 lockId, const TTableId& tableId) const; + bool HasWriteLocks(const TTableId& tableId) const; + bool MayAddLock(ui64 lockId) const; ui64 LocksCount() const { return Locker.LocksCount(); } ui64 BrokenLocksCount() const { return Locker.BrokenLocksCount(); } @@ -526,25 +763,25 @@ public: return Locker.NextPendingSubscribeLock(); } - void RemoveSubscribedLock(ui64 lockId) { - Locker.RemoveSubscribedLock(lockId); + void RemoveSubscribedLock(ui64 lockId, ILocksDb* db) { + Locker.RemoveSubscribedLock(lockId, db); } + void UpdateCounters(); void UpdateCounters(ui64 counter); - // to avoid filling intermidiate Update - // Note: don't forget to call UpdateCounters() when finish with locker - TLockLocker& GetLocker() { return Locker; } + bool Load(ILocksDb& db); private: THolder<TLocksDataShard> Self; TLockLocker Locker; - TLocksUpdate * Update; - TLocksCache *AccessLog; - TLocksCache *Cache; + TLocksUpdate* Update = nullptr; + TLocksCache* AccessLog = nullptr; + TLocksCache* Cache = nullptr; + ILocksDb* Db = nullptr; - TLock MakeLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const; - TLock MakeAndLogLock(ui64 lockTxId, ui64 counter, const TPathId& pathId) const; + TLock MakeLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const; + TLock MakeAndLogLock(ui64 lockTxId, ui32 generation, ui64 counter, const TPathId& pathId) const; static ui64 GetLockId(const TArrayRef<const TCell>& key) { ui64 lockId; diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp new file mode 100644 index 00000000000..3c74047d8be --- /dev/null +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -0,0 +1,167 @@ +#include "datashard_locks_db.h" + +namespace NKikimr::NDataShard { + +bool TDataShardLocksDb::Load(TVector<TLockRow>& rows) { + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(DB); + + rows.clear(); + + // Load locks + THashMap<ui64, size_t> lockIndex; + { + auto rowset = db.Table<Schema::Locks>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + auto& lock = rows.emplace_back(); + lock.LockId = rowset.GetValue<Schema::Locks::LockId>(); + lock.LockNodeId = rowset.GetValue<Schema::Locks::LockNodeId>(); + lock.Generation = rowset.GetValue<Schema::Locks::Generation>(); + lock.Counter = rowset.GetValue<Schema::Locks::Counter>(); + lock.CreateTs = rowset.GetValue<Schema::Locks::CreateTimestamp>(); + lock.Flags = rowset.GetValue<Schema::Locks::Flags>(); + lockIndex[lock.LockId] = rows.size() - 1; + if (!rowset.Next()) { + return false; + } + } + } + + // Load ranges + { + auto rowset = db.Table<Schema::LockRanges>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + auto lockId = rowset.GetValue<Schema::LockRanges::LockId>(); + auto it = lockIndex.find(lockId); + if (it != lockIndex.end()) { + auto& lock = rows[it->second]; + auto& range = lock.Ranges.emplace_back(); + range.RangeId = rowset.GetValue<Schema::LockRanges::RangeId>(); + range.TableId.OwnerId = rowset.GetValue<Schema::LockRanges::PathOwnerId>(); + range.TableId.LocalPathId = rowset.GetValue<Schema::LockRanges::LocalPathId>(); + range.Flags = rowset.GetValue<Schema::LockRanges::Flags>(); + range.Data = rowset.GetValue<Schema::LockRanges::Data>(); + } + if (!rowset.Next()) { + return false; + } + } + } + + // Load conflicts + { + auto rowset = db.Table<Schema::LockConflicts>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + auto lockId = rowset.GetValue<Schema::LockConflicts::LockId>(); + auto it = lockIndex.find(lockId); + if (it != lockIndex.end()) { + auto& lock = rows[it->second]; + lock.Conflicts.push_back(rowset.GetValue<Schema::LockConflicts::ConflictId>()); + } + if (!rowset.Next()) { + return false; + } + } + } + + return true; +} + +bool TDataShardLocksDb::MayAddLock(ui64 lockId) { + for (auto& pr : Self.GetUserTables()) { + auto tid = pr.second->LocalTid; + // We cannot start a new lockId if it has any uncompacted data + if (DB.HasTxData(tid, lockId)) { + return false; + } + } + + return true; +} + +void TDataShardLocksDb::PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::Locks>().Key(lockId).Update( + NIceDb::TUpdate<Schema::Locks::LockNodeId>(lockNodeId), + NIceDb::TUpdate<Schema::Locks::Generation>(generation), + NIceDb::TUpdate<Schema::Locks::Counter>(counter), + NIceDb::TUpdate<Schema::Locks::CreateTimestamp>(createTs), + NIceDb::TUpdate<Schema::Locks::Flags>(flags)); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::Locks>().Key(lockId).Update( + NIceDb::TUpdate<Schema::Locks::Counter>(counter)); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { + for (auto& pr : Self.GetUserTables()) { + auto tid = pr.second->LocalTid; + // Removing the lock also removes any uncommitted data + if (DB.HasOpenTx(tid, lockId)) { + DB.RemoveTx(tid, lockId); + } + } + + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::Locks>().Key(lockId).Delete(); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockRanges>().Key(lockId, rangeId).Update( + NIceDb::TUpdate<Schema::LockRanges::PathOwnerId>(tableId.OwnerId), + NIceDb::TUpdate<Schema::LockRanges::LocalPathId>(tableId.LocalPathId), + NIceDb::TUpdate<Schema::LockRanges::Flags>(flags), + NIceDb::TUpdate<Schema::LockRanges::Data>(data)); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockRanges>().Key(lockId, rangeId).Update( + NIceDb::TUpdate<Schema::LockRanges::Flags>(flags)); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistRemoveRange(ui64 lockId, ui64 rangeId) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockRanges>().Key(lockId, rangeId).Delete(); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistAddConflict(ui64 lockId, ui64 otherLockId) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockConflicts>().Key(lockId, otherLockId).Update(); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistRemoveConflict(ui64 lockId, ui64 otherLockId) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockConflicts>().Key(lockId, otherLockId).Delete(); + HasChanges_ = true; +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_locks_db.h b/ydb/core/tx/datashard/datashard_locks_db.h new file mode 100644 index 00000000000..ea2b2cd3b4f --- /dev/null +++ b/ydb/core/tx/datashard/datashard_locks_db.h @@ -0,0 +1,43 @@ +#pragma once +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class TDataShardLocksDb + : public ILocksDb +{ +public: + TDataShardLocksDb(TDataShard& self, TTransactionContext& txc) + : Self(self) + , DB(txc.DB) + { } + + bool HasChanges() const { + return HasChanges_; + } + + bool Load(TVector<TLockRow>& rows) override; + + bool MayAddLock(ui64 lockId) override; + + // Persist adding/removing a lock info + void PersistAddLock(ui64 lockId, ui32 lockNodeId, ui32 generation, ui64 counter, ui64 createTs, ui64 flags = 0) override; + void PersistLockCounter(ui64 lockId, ui64 counter) override; + void PersistRemoveLock(ui64 lockId) override; + + // Persist adding/removing info on locked ranges + void PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags = 0, const TString& data = {}) override; + void PersistRangeFlags(ui64 lockId, ui64 rangeId, ui64 flags) override; + void PersistRemoveRange(ui64 lockId, ui64 rangeId) override; + + // Persist a conflict, i.e. this lock must break some other lock on commit + void PersistAddConflict(ui64 lockId, ui64 otherLockId) override; + void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) override; + +private: + TDataShard& Self; + NTable::TDatabase& DB; + bool HasChanges_ = false; +}; + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index e5c976dd352..d398647ff9e 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -147,6 +147,64 @@ namespace NKqpHelpers { return request; } + inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query) { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return "<empty>"; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + } + + inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + txId = response.GetResponse().GetTxMeta().id(); + if (response.GetResponse().GetResults().size() == 0) { + return "<empty>"; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + } + + inline TString KqpSimpleContinue(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return "<empty>"; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + } + + inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return "<empty>"; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + } + } // namespace NKqpHelpers } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp index 6bc76ddf396..e7724d352f0 100644 --- a/ydb/core/tx/datashard/datashard_ut_locks.cpp +++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp @@ -275,9 +275,9 @@ namespace NTest { static void RemoveLock(TLockTester& tester, ui64 lockTxId) { TLocksUpdate txLocks; - tester.StartTx(lockTxId, txLocks); + tester.StartTx(txLocks); - txLocks.EraseLock(lockTxId); + tester.EraseLock(lockTxId); tester.ApplyTxLocks(); } @@ -362,19 +362,9 @@ Y_UNIT_TEST(MvccTestOutdatedLocksRemove) { tester.PromoteCompleteVersion(TRowVersion(1, 10)); { - // lock is ready to be deleted but still in place - TLocksUpdate update; - update.CheckVersion = TRowVersion(1, 5); - tester.StartTx(update); - UNIT_ASSERT(tester.CheckLock(10)); - tester.ApplyTxLocks(); - } - - { - // erase triggers outdated locks cleanup + // next lock operation causes cleanup TLocksUpdate update; tester.StartTx(update); - tester.EraseLock(12); tester.ApplyTxLocks(); } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 1da604332a4..92c2dfa267c 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1471,95 +1471,27 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); } - Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) { - TPortManager pm; - TServerSettings::TControls controls; - controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); - controls.MutableDataShardControls()->SetEnableLockedWrites(1); - - TServerSettings serverSettings(pm.GetPort(2134)); - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetEnableMvcc(true) - .SetEnableMvccSnapshotReads(true) - .SetEnableKqpSessionActor(UseNewEngine) - .SetControls(controls); - - Tests::TServer::TPtr server = new TServer(serverSettings); - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); - - InitRoot(server, sender); - - TDisableDataShardLogBatching disableDataShardLogBatching; - CreateShardedTable(server, sender, "/Root", "table-1", 1); - - ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); - - SimulateSleep(server, TDuration::Seconds(1)); - - auto execSimpleRequest = [&](const TString& query, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), expectedStatus); - if (response.GetResponse().GetResults().size() == 0) { - return ""; - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); - }; - - auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - sessionId = CreateSession(runtime, reqSender); - auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); - auto& response = ev->Get()->Record.GetRef(); - UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); - txId = response.GetResponse().GetTxMeta().id(); - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); - }; + struct TLockSnapshot { + ui64 LockId = 0; + ui32 LockNodeId = 0; + TRowVersion MvccSnapshot = TRowVersion::Min(); + }; - auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); - auto& response = ev->Get()->Record.GetRef(); - if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); - } - if (response.GetResponse().GetResults().size() == 0) { - return ""; - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); - }; + class TInjectLockSnapshotObserver { + public: + TInjectLockSnapshotObserver(TTestActorRuntime& runtime) + : Runtime(runtime) + { + PrevObserver = runtime.SetObserverFunc([this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + return this->Process(ev); + }); + } - auto commitSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { - auto reqSender = runtime.AllocateEdgeActor(); - auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query)); - auto& response = ev->Get()->Record.GetRef(); - if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); - } - if (response.GetResponse().GetResults().size() == 0) { - return ""; - } - UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); - return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); - }; + ~TInjectLockSnapshotObserver() { + Runtime.SetObserverFunc(PrevObserver); + } - ui64 lastLockTxId = 0; - ui32 lastLockNodeId = 0; - TRowVersion lastMvccSnapshot = TRowVersion::Min(); - ui64 injectLockTxId = 0; - ui32 injectLockNodeId = 0; - TRowVersion injectMvccSnapshot = TRowVersion::Min(); - auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + TTestActorRuntime::EEventAction Process(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { case TEvDataShard::TEvProposeTransaction::EventType: { auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record; @@ -1590,37 +1522,79 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } } + Last = {}; if (tx.GetLockTxId()) { - lastLockTxId = tx.GetLockTxId(); - lastLockNodeId = tx.GetLockNodeId(); - } else if (injectLockTxId) { - tx.SetLockTxId(injectLockTxId); - if (injectLockNodeId) { - tx.SetLockNodeId(injectLockNodeId); + Last.LockId = tx.GetLockTxId(); + Last.LockNodeId = tx.GetLockNodeId(); + } else if (Inject.LockId) { + tx.SetLockTxId(Inject.LockId); + if (Inject.LockNodeId) { + tx.SetLockNodeId(Inject.LockNodeId); } TString txBody; Y_VERIFY(tx.SerializeToString(&txBody)); record.SetTxBody(txBody); } if (record.HasMvccSnapshot()) { - lastMvccSnapshot.Step = record.GetMvccSnapshot().GetStep(); - lastMvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId(); - } else if (injectMvccSnapshot) { - record.MutableMvccSnapshot()->SetStep(injectMvccSnapshot.Step); - record.MutableMvccSnapshot()->SetTxId(injectMvccSnapshot.TxId); + Last.MvccSnapshot.Step = record.GetMvccSnapshot().GetStep(); + Last.MvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId(); + } else if (Inject.MvccSnapshot) { + record.MutableMvccSnapshot()->SetStep(Inject.MvccSnapshot.Step); + record.MutableMvccSnapshot()->SetTxId(Inject.MvccSnapshot.TxId); } } break; } } - return TTestActorRuntime::EEventAction::PROCESS; - }; - auto prevObserverFunc = runtime.SetObserverFunc(capturePropose); + return PrevObserver(Runtime, ev); + } + + private: + TTestActorRuntime& Runtime; + TTestActorRuntime::TEventObserver PrevObserver; + + public: + TLockSnapshot Last; + TLockSnapshot Inject; + }; + + Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); // Start a snapshot read transaction TString sessionId, txId; UNIT_ASSERT_VALUES_EQUAL( - beginSnapshotRequest(sessionId, txId, Q_(R"( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3 ORDER BY key @@ -1630,25 +1604,18 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); // We should have been acquiring locks - Y_VERIFY(lastLockTxId != 0); - ui64 snapshotLockTxId = lastLockTxId; - ui32 snapshotLockNodeId = lastLockNodeId; - Y_VERIFY(lastMvccSnapshot); - auto snapshotVersion = lastMvccSnapshot; + TLockSnapshot snapshot = observer.Last; + Y_VERIFY(snapshot.LockId != 0); + Y_VERIFY(snapshot.MvccSnapshot); // Perform an immediate write, pretending it happens as part of the above snapshot tx - injectLockTxId = snapshotLockTxId; - injectLockNodeId = snapshotLockNodeId; - injectMvccSnapshot = snapshotVersion; + observer.Inject = snapshot; UNIT_ASSERT_VALUES_EQUAL( - execSimpleRequest(Q_(R"( + KqpSimpleExec(runtime, Q_(R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2) - )"), - UseNewEngine ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::UNAVAILABLE), - ""); - injectLockTxId = 0; - injectLockNodeId = 0; - injectMvccSnapshot = TRowVersion::Min(); + )")), + UseNewEngine ? "<empty>" : "ERROR: UNAVAILABLE"); + observer.Inject = {}; // Old engine doesn't support LockNodeId // There's nothing to test unless we can write uncommitted data @@ -1659,7 +1626,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { // Start another snapshot read, it should not see above write (it's uncommitted) TString sessionId2, txId2; UNIT_ASSERT_VALUES_EQUAL( - beginSnapshotRequest(sessionId2, txId2, Q_(R"( + KqpSimpleBegin(runtime, sessionId2, txId2, Q_(R"( SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3 ORDER BY key @@ -1670,7 +1637,7 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { // Perform another read using the first snapshot tx, it must see its own writes UNIT_ASSERT_VALUES_EQUAL( - continueSnapshotRequest(sessionId, txId, Q_(R"( + KqpSimpleContinue(runtime, sessionId, txId, Q_(R"( SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3 ORDER BY key @@ -1682,17 +1649,17 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { // Now commit with additional changes (temporarily needed to trigger lock commits) UNIT_ASSERT_VALUES_EQUAL( - commitSnapshotRequest(sessionId, txId, Q_(R"( + KqpSimpleCommit(runtime, sessionId, txId, Q_(R"( UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) )")), - ""); + "<empty>"); if (UseNewEngine) { // Verify new snapshots observe all committed changes // This is only possible with new engine at this time TString sessionId3, txId3; UNIT_ASSERT_VALUES_EQUAL( - beginSnapshotRequest(sessionId3, txId3, Q_(R"( + KqpSimpleBegin(runtime, sessionId3, txId3, Q_(R"( SELECT key, value FROM `/Root/table-1` WHERE key >= 1 AND key <= 3 ORDER BY key @@ -1705,6 +1672,120 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } } + Y_UNIT_TEST(MvccSnapshotLockedWritesRestart) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We should have been acquiring locks + TLockSnapshot snapshot = observer.Last; + Y_VERIFY(snapshot.LockId != 0); + Y_VERIFY(snapshot.MvccSnapshot); + + // Perform an immediate write, pretending it happens as part of the above snapshot tx + // We expect read lock to be upgraded to write lock and become persistent + observer.Inject = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2) + )")), + "<empty>"); + observer.Inject = {}; + + // Reboot tablet, persistent locks must not be lost + RebootTablet(runtime, shards1[0], sender); + + // Start another snapshot read, it should not see above write (it's uncommitted) + TString sessionId2, txId2; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId2, txId2, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Perform another read using the first snapshot tx, it must see its own writes + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + + // Now commit with additional changes (temporarily needed to trigger lock commits) + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) + )")), + "<empty>"); + + // Verify new snapshots observe all committed changes + // This is only possible with new engine at this time + TString sessionId3, txId3; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId3, txId3, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index d237bbdbcb6..a9d840cace0 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -2,6 +2,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" namespace NKikimr { namespace NDataShard { @@ -29,7 +30,8 @@ public: op->MvccReadWriteVersion.reset(); } - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get()); Y_VERIFY(tx != nullptr); diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp index 0ae61b8af80..e1949849105 100644 --- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp @@ -3,6 +3,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" namespace NKikimr { namespace NDataShard { @@ -29,7 +30,8 @@ public: TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); const auto& commitTx = tx->GetCommitWritesTx()->GetBody(); const auto versions = DataShard.GetReadWriteVersions(op.Get()); diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index b6ac426d252..4f9d2066994 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -1,6 +1,7 @@ #include "datashard_kqp.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" namespace NKikimr { @@ -65,7 +66,8 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, op->MvccReadWriteVersion.reset(); } - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); @@ -161,9 +163,14 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds()); op->ResetCurrentTimer(); - if (op->IsReadOnly()) + if (op->IsReadOnly() && !locksDb.HasChanges()) return EExecutionStatus::Executed; + if (locksDb.HasChanges()) { + // We made some changes to locks db, make sure we wait for commit + op->SetWaitCompletionFlag(true); + } + return EExecutionStatus::ExecutedNoMoreRestarts; } diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index bc6fda112e0..d15c7e551cf 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -5,6 +5,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" #include <util/generic/bitmap.h> @@ -35,7 +36,8 @@ public: TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); const auto& eraseTx = tx->GetDistributedEraseTx(); const auto& request = eraseTx->GetRequest(); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index b60562af1a2..5ff35cd413e 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -3,6 +3,7 @@ #include "datashard_pipeline.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" +#include "datashard_locks_db.h" #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/kqp/rm/kqp_rm.h> @@ -67,7 +68,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio op->MvccReadWriteVersion.reset(); } - TSetupSysLocks guardLocks(op, DataShard); + TDataShardLocksDb locksDb(DataShard, txc); + TSetupSysLocks guardLocks(op, DataShard, &locksDb); TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); @@ -134,10 +136,13 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) { - KqpRollbackLockChanges(tabletId, tx, DataShard, txc); KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); DataShard.SysLocksTable().ApplyLocks(); DataShard.SubscribeNewLocks(ctx); + if (locksDb.HasChanges()) { + op->SetWaitCompletionFlag(true); + return EExecutionStatus::ExecutedNoMoreRestarts; + } return EExecutionStatus::Executed; } @@ -226,7 +231,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio DataShard.IncCounter(COUNTER_WAIT_TOTAL_LATENCY_MS, waitTotalLatency.MilliSeconds()); op->ResetCurrentTimer(); - return op->IsReadOnly() ? EExecutionStatus::Executed : EExecutionStatus::ExecutedNoMoreRestarts; + if (op->IsReadOnly() && !locksDb.HasChanges()) { + return EExecutionStatus::Executed; + } + + if (locksDb.HasChanges()) { + op->SetWaitCompletionFlag(true); + } + + return EExecutionStatus::ExecutedNoMoreRestarts; } void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 6d9a0b5b459..59a90ced678 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -444,8 +444,7 @@ struct TOutputOpData { TDelayedAcks DelayedAcks; TOutReadSets OutReadSets; TVector<THolder<TEvTxProcessing::TEvReadSet>> PreparedOutReadSets; - // Updates and checked locks. - TLocksUpdate LocksUpdate; + // Access log of checked locks TLocksCache LocksAccessLog; // Collected change records TVector<TChangeRecord> ChangeRecords; @@ -604,7 +603,6 @@ public: return OutputDataRef().PreparedOutReadSets; } - TLocksUpdate &LocksUpdate() { return OutputDataRef().LocksUpdate; } TLocksCache &LocksAccessLog() { return OutputDataRef().LocksAccessLog; } TVector<TOutputOpData::TChangeRecord> &ChangeRecords() { return OutputDataRef().ChangeRecords; } diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index dcd4eb6eb94..47b71bc6f96 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -159,7 +159,6 @@ public: bool IsHeadRead = false; ui64 LockTxId = 0; TLockInfo::TPtr Lock; - bool ReportedLockBroken = false; // note that will be always overwritten by values from request NKikimrTxDataShard::EScanDataFormat Format = NKikimrTxDataShard::EScanDataFormat::CELLVEC; diff --git a/ydb/core/tx/datashard/remove_locks.cpp b/ydb/core/tx/datashard/remove_locks.cpp index f05ab6d7b81..d0204f18939 100644 --- a/ydb/core/tx/datashard/remove_locks.cpp +++ b/ydb/core/tx/datashard/remove_locks.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" namespace NKikimr::NDataShard { @@ -16,20 +17,11 @@ public: TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK; } bool Execute(TTransactionContext& txc, const TActorContext&) override { - // Remove any uncommitted changes with this lock id - // FIXME: if a distributed tx has already validated (and persisted) - // its locks, we must preserve uncommitted changes even when lock is - // removed on the originating node, since the final outcome may - // actually decide to commit. - for (const auto& pr : Self->GetUserTables()) { - auto localTid = pr.second->LocalTid; - if (txc.DB.HasOpenTx(localTid, LockId)) { - txc.DB.RemoveTx(localTid, LockId); - } - } - // Remove the lock from memory, it's no longer needed - Self->SysLocks.RemoveSubscribedLock(LockId); + // Note: locksDb will also remove uncommitted changes + // when removing a persistent lock. + TDataShardLocksDb locksDb(*Self, txc); + Self->SysLocks.RemoveSubscribedLock(LockId, &locksDb); return true; } @@ -57,6 +49,10 @@ void TDataShard::Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorC } void TDataShard::SubscribeNewLocks(const TActorContext&) { + SubscribeNewLocks(); +} + +void TDataShard::SubscribeNewLocks() { while (auto pendingSubscribeLock = SysLocks.NextPendingSubscribeLock()) { Send(MakeLongTxServiceID(SelfId().NodeId()), new TEvLongTxService::TEvSubscribeLock( diff --git a/ydb/core/tx/datashard/setup_sys_locks.h b/ydb/core/tx/datashard/setup_sys_locks.h index 026ba14f41d..9cfd6bff8af 100644 --- a/ydb/core/tx/datashard/setup_sys_locks.h +++ b/ydb/core/tx/datashard/setup_sys_locks.h @@ -6,18 +6,41 @@ namespace NKikimr { namespace NDataShard { -struct TSetupSysLocks { +struct TSetupSysLocks + : public TLocksUpdate +{ TSysLocks &SysLocksTable; - TSetupSysLocks(TOperation::TPtr op, - TDataShard &self) + TSetupSysLocks(TDataShard& self, ILocksDb* db) + : SysLocksTable(self.SysLocksTable()) + { + CheckVersion = TRowVersion::Min(); + BreakVersion = TRowVersion::Min(); + + SysLocksTable.SetTxUpdater(this); + SysLocksTable.SetDb(db); + } + + TSetupSysLocks(ui64 lockTxId, ui32 lockNodeId, const TRowVersion& readVersion, + TDataShard& self, ILocksDb* db) : SysLocksTable(self.SysLocksTable()) { - TLocksUpdate &update = op->LocksUpdate(); + LockTxId = lockTxId; + LockNodeId = lockNodeId; + CheckVersion = readVersion; + BreakVersion = TRowVersion::Min(); - update.Clear(); - update.LockTxId = op->LockTxId(); - update.LockNodeId = op->LockNodeId(); + SysLocksTable.SetTxUpdater(this); + SysLocksTable.SetDb(db); + } + + TSetupSysLocks(TOperation::TPtr op, + TDataShard &self, + ILocksDb* db) + : SysLocksTable(self.SysLocksTable()) + { + LockTxId = op->LockTxId(); + LockNodeId = op->LockNodeId(); if (self.IsMvccEnabled()) { auto [readVersion, writeVersion] = self.GetReadWriteVersions(op.Get()); @@ -29,21 +52,23 @@ struct TSetupSysLocks { outOfOrder = true; } - update.CheckVersion = readVersion; - update.BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min(); + CheckVersion = readVersion; + BreakVersion = outOfOrder ? writeVersion : TRowVersion::Min(); } - SysLocksTable.SetTxUpdater(&update); + SysLocksTable.SetTxUpdater(this); if (!op->LocksCache().Locks.empty()) SysLocksTable.SetCache(&op->LocksCache()); else SysLocksTable.SetAccessLog(&op->LocksAccessLog()); + SysLocksTable.SetDb(db); } ~TSetupSysLocks() { SysLocksTable.SetTxUpdater(nullptr); SysLocksTable.SetCache(nullptr); SysLocksTable.SetAccessLog(nullptr); + SysLocksTable.SetDb(nullptr); } }; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 05d887b602f..9db6a1eaef8 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1550,5 +1550,178 @@ "Blobs": 1 } } + }, + { + "TableId": 29, + "TableName": "Locks", + "TableKey": [ + 1 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LockNodeId", + "ColumnType": "Uint32" + }, + { + "ColumnId": 3, + "ColumnName": "Generation", + "ColumnType": "Uint32" + }, + { + "ColumnId": 4, + "ColumnName": "Counter", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "CreateTimestamp", + "ColumnType": "Uint64" + }, + { + "ColumnId": 6, + "ColumnName": "Flags", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5, + 6 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { + "TableId": 30, + "TableName": "LockRanges", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "RangeId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "PathOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "LocalPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "Flags", + "ColumnType": "Uint64" + }, + { + "ColumnId": 6, + "ColumnName": "Data", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5, + 6 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { + "TableId": 31, + "TableName": "LockConflicts", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "ConflictId", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |