diff options
author | snaury <snaury@ydb.tech> | 2022-10-06 12:37:18 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-10-06 12:37:18 +0300 |
commit | a9fdd4fb908d14a055b3a536c523d679391a70b3 (patch) | |
tree | 04395fdaa7d992729ce33741c04ea528b5a5ef35 | |
parent | 92515b8e8d1c011fcbd3916230ef55e294bf9be5 (diff) | |
download | ydb-a9fdd4fb908d14a055b3a536c523d679391a70b3.tar.gz |
Clean uncommitted changes on split and copy table
23 files changed, 534 insertions, 8 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index f002b678fa4..d415560d6ef 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -306,6 +306,7 @@ message TSnapshotTransferReadSet { optional uint64 MvccLowWatermarkTxId = 9; optional uint64 MvccImmediateWriteEdgeStep = 10; optional uint64 MvccImmediateWriteEdgeTxId = 11; + optional bool WithOpenTxs = 12; } message TSnapshotTransferInfo { diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 6d705ab3de0..8cb61e0fee7 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -374,6 +374,11 @@ bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const return Require(table)->HasRemovedTx(txId); } +TVector<ui64> TDatabase::GetOpenTxs(ui32 table) const +{ + return Require(table)->GetOpenTxs(); +} + void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper) { if (Y_LIKELY(lower < upper)) { diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index c714d44f3ed..a30e4190fa8 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -128,6 +128,13 @@ public: bool HasRemovedTx(ui32 table, ui64 txId) const; /** + * Returns a list of open transactions in the provided table. This only + * includes transactions with changes that are neither committed nor + * removed. + */ + TVector<ui64> GetOpenTxs(ui32 table) const; + + /** * Remove row versions [lower, upper) from the given table * * Once committed this cannot be undone. This is a hint to the underlying diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index c52869ded54..5e976d64c2e 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -911,6 +911,20 @@ bool TTable::HasRemovedTx(ui64 txId) const return RemovedTransactions.Contains(txId); } +TVector<ui64> TTable::GetOpenTxs() const +{ + TVector<ui64> txs; + + for (auto& pr : TxRefs) { + ui64 txId = pr.first; + if (!CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) { + txs.push_back(txId); + } + } + + return txs; +} + TMemTable& TTable::MemTable() { if (!Mutable) { diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 1b9992de118..54430fcfbc9 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -172,6 +172,8 @@ public: bool HasCommittedTx(ui64 txId) const; bool HasRemovedTx(ui64 txId) const; + TVector<ui64> GetOpenTxs() const; + TPartView GetPartView(const TLogoBlobID &bundle) const { auto *partView = Flatten.FindPtr(bundle); diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 8c50eba3293..f7f36a6afb4 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -220,6 +220,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp diff --git a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp index 8aef437313e..95dd8b28f05 100644 --- a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp @@ -63,6 +63,9 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op, ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; for (auto &snapshot : op->InputSnapshots()) { + auto* txSnapshot = dynamic_cast<TTxTableSnapshotContext*>(snapshot.Get()); + Y_VERIFY(txSnapshot, "Unexpected input snapshot type"); + TString snapBody = DataShard.BorrowSnapshot(localTableId, *snapshot, { }, { }, targetTablet); txc.Env.DropSnapshot(snapshot); @@ -84,7 +87,7 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op, TRowVersion lowWatermark = mvcc ? DataShard.GetSnapshotManager().GetLowWatermark() : TRowVersion::Min(); - if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark) + if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark || txSnapshot->HasOpenTxs()) extended = true; // Must use an extended format if (extended) { @@ -122,6 +125,10 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op, rs.SetMvccLowWatermarkTxId(lowWatermark.TxId); } + if (txSnapshot->HasOpenTxs()) { + rs.SetWithOpenTxs(true); + } + rsBody.reserve(SnapshotTransferReadSetMagic.size() + rs.ByteSizeLong()); rsBody.append(SnapshotTransferReadSetMagic); Y_PROTOBUF_SUPPRESS_NODISCARD rs.AppendToString(&rsBody); diff --git a/ydb/core/tx/datashard/create_table_unit.cpp b/ydb/core/tx/datashard/create_table_unit.cpp index 85f38d456a3..e8b253cd020 100644 --- a/ydb/core/tx/datashard/create_table_unit.cpp +++ b/ydb/core/tx/datashard/create_table_unit.cpp @@ -56,6 +56,11 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op, tableId.LocalPathId = createTableTx.GetPathId().GetLocalId(); } + if (DataShard.HasUserTable(tableId)) { + // Table already created, assume we restarted after a successful commit + return EExecutionStatus::Executed; + } + const ui64 schemaVersion = createTableTx.HasTableSchemaVersion() ? createTableTx.GetTableSchemaVersion() : 0u; LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 9a9b30fbe69..a60dc40a2b1 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -964,6 +964,11 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId)); } +void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) { + LastLoanTableTid = localTid; + PersistSys(db, Schema::Sys_LastLoanTableTid, LastLoanTableTid); +} + TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc, const NKikimrSchemeOp::TTableDescription& tableScheme) { @@ -3323,6 +3328,38 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl return true; } +class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxGetOpenTxs(TDataShard* self, TEvDataShard::TEvGetOpenTxs::TPtr&& ev) + : TTransactionBase(self) + , Ev(std::move(ev)) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + auto pathId = Ev->Get()->PathId; + auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin(); + Y_VERIFY(it != Self->GetUserTables().end()); + + auto txs = txc.DB.GetOpenTxs(it->second->LocalTid); + + Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(txs)); + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie); + } + +private: + TEvDataShard::TEvGetOpenTxs::TPtr Ev; + THolder<TEvDataShard::TEvGetOpenTxsResult> Reply; +}; + +void TDataShard::Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxGetOpenTxs(this, std::move(ev)), ctx); +} + + } // NDataShard TString TEvDataShard::TEvRead::ToString() const { diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 7a7e362aea8..2c295f99005 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -305,6 +305,9 @@ struct TEvDataShard { EvTestLoadResponse, EvTestLoadFinished, + EvGetOpenTxs, /* for tests */ + EvGetOpenTxsResult, /* for tests */ + EvEnd }; @@ -1573,6 +1576,24 @@ struct TEvDataShard { } }; + struct TEvGetOpenTxs : public TEventLocal<TEvGetOpenTxs, EvGetOpenTxs> { + TPathId PathId; + + TEvGetOpenTxs(const TPathId& pathId) + : PathId(pathId) + { } + }; + + struct TEvGetOpenTxsResult : public TEventLocal<TEvGetOpenTxsResult, EvGetOpenTxsResult> { + TPathId PathId; + TVector<ui64> OpenTxs; + + TEvGetOpenTxsResult(const TPathId& pathId, TVector<ui64> openTxs) + : PathId(pathId) + , OpenTxs(std::move(openTxs)) + { } + }; + }; IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info); diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 7d296877ca0..3f2d579d6a0 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -21,6 +21,7 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext& try { Self->State = TShardState::Unknown; Self->LastLocalTid = Schema::MinLocalTid; + Self->LastLoanTableTid = 0; Self->LastSeqno = 1; Self->NextChangeRecordOrder = 1; Self->LastChangeRecordGroup = 1; @@ -178,6 +179,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { // Reads from Sys table LOAD_SYS_UI64(db, Schema::Sys_State, Self->State); LOAD_SYS_UI64(db, Schema::Sys_LastLocalTid, Self->LastLocalTid); + LOAD_SYS_UI64(db, Schema::Sys_LastLoanTableTid, Self->LastLoanTableTid); LOAD_SYS_UI64(db, Schema::Sys_LastSeqno, Self->LastSeqno); LOAD_SYS_UI64(db, Schema::Sys_NextChangeRecordOrder, Self->NextChangeRecordOrder); LOAD_SYS_UI64(db, Schema::Sys_LastChangeRecordGroup, Self->LastChangeRecordGroup); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 6904293b1e4..159d0b516f2 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -859,6 +859,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded) plan.push_back(EExecutionUnitKind::Restore); plan.push_back(EExecutionUnitKind::CreateTable); plan.push_back(EExecutionUnitKind::ReceiveSnapshot); + plan.push_back(EExecutionUnitKind::ReceiveSnapshotCleanup); plan.push_back(EExecutionUnitKind::AlterMoveShadow); plan.push_back(EExecutionUnitKind::AlterTable); plan.push_back(EExecutionUnitKind::DropTable); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 2ec648f65af..01333c30e66 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -60,9 +60,10 @@ using NLongTxService::TEvLongTxService; // For CopyTable and MoveShadow class TTxTableSnapshotContext : public NTabletFlatExecutor::TTableSnapshotContext { public: - TTxTableSnapshotContext(ui64 step, ui64 txId, TVector<ui32>&& tables) + TTxTableSnapshotContext(ui64 step, ui64 txId, TVector<ui32>&& tables, bool hasOpenTxs = false) : StepOrder(step, txId) , Tables(tables) + , HasOpenTxs_(hasOpenTxs) {} const TStepOrder& GetStepOrder() const { @@ -73,9 +74,14 @@ public: return Tables; } + bool HasOpenTxs() const { + return HasOpenTxs_; + } + private: TStepOrder StepOrder; TVector<ui32> Tables; + bool HasOpenTxs_; }; // For Split @@ -207,6 +213,7 @@ class TDataShard class TTxCompactTable; class TTxPersistFullCompactionTs; class TTxRemoveLock; + class TTxGetOpenTxs; template <typename T> friend class TTxDirectBase; class TTxUploadRows; @@ -870,6 +877,8 @@ class TDataShard SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled + Sys_LastLoanTableTid, // 41 Last tid that we used in LoanTable + // reserved SysPipeline_Flags = 1000, SysPipeline_LimitActiveTx, @@ -882,6 +891,7 @@ class TDataShard static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value"); static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value"); static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value"); + static_assert(ESysTableKeys::Sys_LastLoanTableTid == 41, "Sys_LastLoanTableTid changed its value"); static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000 @@ -1064,6 +1074,8 @@ class TDataShard void Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1248,6 +1260,18 @@ public: return prefix; } + bool HasUserTable(const TPathId& tableId) { + return TableInfos.contains(tableId.LocalPathId); + } + + TUserTable::TCPtr FindUserTable(const TPathId& tableId) { + auto it = TableInfos.find(tableId.LocalPathId); + if (it != TableInfos.end()) { + return it->second; + } + return nullptr; + } + void RemoveUserTable(const TPathId& tableId) { TableInfos.erase(tableId.LocalPathId); SysLocks.RemoveSchema(tableId); @@ -1489,6 +1513,9 @@ public: void DropUserTable(TTransactionContext& txc, ui64 tableId); ui32 GetLastLocalTid() const { return LastLocalTid; } + ui32 GetLastLoanTableTid() const { return LastLoanTableTid; } + + void PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid); ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db); ui64 AllocateChangeRecordGroup(NIceDb::TNiceDb& db); @@ -2103,6 +2130,7 @@ private: // Sys table contents ui32 State; ui32 LastLocalTid; + ui32 LastLoanTableTid; ui64 LastSeqno; ui64 NextChangeRecordOrder; ui64 LastChangeRecordGroup; @@ -2465,6 +2493,7 @@ protected: fFunc(TEvDataShard::EvReplicationSourceOffsetsAck, HandleByReplicationSourceOffsetsServer); fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer); HFunc(TEvLongTxService::TEvLockStatus, Handle); + HFunc(TEvDataShard::TEvGetOpenTxs, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index 999ddd5973e..a48b97301a0 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -876,6 +876,13 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const { return TLock(); } +void TSysLocks::EraseLock(ui64 lockId) { + Y_VERIFY(Update); + if (auto* lock = Locker.FindLockPtr(lockId)) { + Update->AddEraseLock(lock); + } +} + void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) { Y_VERIFY(Update); if (auto* lock = Locker.FindLockPtr(GetLockId(key))) { diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index e9e844a9242..7d65c115aba 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -595,6 +595,10 @@ public: PendingSubscribeLocks.clear(); } + const THashMap<ui64, TLockInfo::TPtr>& GetLocks() const { + return Locks; + } + private: const THolder<TLocksDataShard> Self; THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId @@ -799,6 +803,7 @@ public: TVector<TLock> ApplyLocks(); ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const; TLock GetLock(const TArrayRef<const TCell>& syslockKey) const; + void EraseLock(ui64 lockId); void EraseLock(const TArrayRef<const TCell>& syslockKey); void CommitLock(const TArrayRef<const TCell>& syslockKey); void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key); @@ -856,6 +861,10 @@ public: bool Load(ILocksDb& db); + const THashMap<ui64, TLockInfo::TPtr>& GetLocks() const { + return Locker.GetLocks(); + } + private: THolder<TLocksDataShard> Self; TLockLocker Locker; diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index eba1ea0a0fa..f762801c221 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -1,4 +1,6 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" +#include "setup_sys_locks.h" #include <ydb/core/tablet_flat/tablet_flat_executor.h> #include <ydb/core/util/pb.h> @@ -127,6 +129,27 @@ public: Y_VERIFY(Self->TxInFly() == 0, "Currently split operation shouldn't start while there are in-flight transactions"); + // We need to remove all locks first, making sure persistent uncommitted + // changes are not borrowed by new shards. Otherwise those will become + // unaccounted for. + if (!Self->SysLocksTable().GetLocks().empty()) { + auto countBefore = Self->SysLocksTable().GetLocks().size(); + TDataShardLocksDb locksDb(*Self, txc); + TSetupSysLocks guardLocks(*Self, &locksDb); + for (auto& pr : Self->SysLocksTable().GetLocks()) { + Self->SysLocksTable().EraseLock(pr.first); + if (pr.second->IsPersistent()) { + // Don't erase more than one persistent lock at a time + break; + } + } + Self->SysLocksTable().ApplyLocks(); + auto countAfter = Self->SysLocksTable().GetLocks().size(); + Y_VERIFY(countAfter < countBefore, "Expected to erase at least one lock"); + Self->Execute(Self->CreateTxStartSplit(), ctx); + return true; + } + ui64 opId = Self->SrcSplitOpId; LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " starting snapshot for split OpId " << opId); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index d86aeb3b27f..cab83571ed4 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3268,6 +3268,243 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); } + Y_UNIT_TEST(LockedWriteCleanupOnSplit) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + // Check shard has some open transactions + { + auto checkSender = runtime.AllocateEdgeActor(); + runtime.SendToPipe(shards.at(0), checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId)); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender); + UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + + // Split/merge would fail otherwise :( + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + + // Split table in two shards + { + //Cerr << "----Split Begin----" << Endl; + auto senderSplit = runtime.AllocateEdgeActor(); + auto tablets = GetTableShards(server, senderSplit, "/Root/table-1"); + ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-1", tablets.at(0), 3); + WaitTxNotification(server, senderSplit, txId); + //Cerr << "----Split End----" << Endl; + } + + shards = GetTableShards(server, sender, "/Root/table-1"); + + // Check new shards don't have any open transactions + { + auto checkSender = runtime.AllocateEdgeActor(); + for (auto shardId : shards) { + runtime.SendToPipe(shardId, checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId)); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender); + UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shardId); + } + } + } + + Y_UNIT_TEST(LockedWriteCleanupOnCopyTable) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to key 2 with tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + auto tableId = ResolveTableId(server, sender, "/Root/table-1"); + + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + // Check shard has some open transactions + { + auto checkSender = runtime.AllocateEdgeActor(); + runtime.SendToPipe(shards.at(0), checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId)); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender); + UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0)); + } + + // Copy table + { + auto senderCopy = runtime.AllocateEdgeActor(); + ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1"); + WaitTxNotification(server, senderCopy, txId); + } + + auto shards2 = GetTableShards(server, sender, "/Root/table-2"); + auto tableId2 = ResolveTableId(server, sender, "/Root/table-2"); + + // Check new shards don't have any open transactions + { + auto checkSender = runtime.AllocateEdgeActor(); + for (auto shardId : shards2) { + runtime.SendToPipe(shardId, checkSender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId)); + auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender); + UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shardId); + } + } + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // Check original table has those changes visible + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + + // Check table copy does not have those changes + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-2` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp index c4dd0c88622..5e8c32a6582 100644 --- a/ydb/core/tx/datashard/execution_unit.cpp +++ b/ydb/core/tx/datashard/execution_unit.cpp @@ -94,6 +94,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind, return CreateCreateTableUnit(dataShard, pipeline); case EExecutionUnitKind::ReceiveSnapshot: return CreateReceiveSnapshotUnit(dataShard, pipeline); + case EExecutionUnitKind::ReceiveSnapshotCleanup: + return CreateReceiveSnapshotCleanupUnit(dataShard, pipeline); case EExecutionUnitKind::AlterMoveShadow: return CreateAlterMoveShadowUnit(dataShard, pipeline); case EExecutionUnitKind::AlterTable: diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h index e03c77e2c2a..2585a2d9b8e 100644 --- a/ydb/core/tx/datashard/execution_unit_ctors.h +++ b/ydb/core/tx/datashard/execution_unit_ctors.h @@ -47,6 +47,7 @@ THolder<TExecutionUnit> CreateBackupUnit(TDataShard &dataShard, TPipeline &pipel THolder<TExecutionUnit> CreateRestoreUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateCreateTableUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateReceiveSnapshotUnit(TDataShard &dataShard, TPipeline &pipeline); +THolder<TExecutionUnit> CreateReceiveSnapshotCleanupUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateAlterMoveShadowUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateAlterTableUnit(TDataShard &dataShard, TPipeline &pipeline); THolder<TExecutionUnit> CreateDropTableUnit(TDataShard &dataShard, TPipeline &pipeline); diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h index 832854d966a..a96aaaaf214 100644 --- a/ydb/core/tx/datashard/execution_unit_kind.h +++ b/ydb/core/tx/datashard/execution_unit_kind.h @@ -49,6 +49,7 @@ enum class EExecutionUnitKind : ui32 { Restore, CreateTable, ReceiveSnapshot, + ReceiveSnapshotCleanup, AlterMoveShadow, AlterTable, DropTable, diff --git a/ydb/core/tx/datashard/make_snapshot_unit.cpp b/ydb/core/tx/datashard/make_snapshot_unit.cpp index 8532f04d29e..4321daa414b 100644 --- a/ydb/core/tx/datashard/make_snapshot_unit.cpp +++ b/ydb/core/tx/datashard/make_snapshot_unit.cpp @@ -59,8 +59,9 @@ EExecutionStatus TMakeSnapshotUnit::Execute(TOperation::TPtr op, Y_VERIFY(DataShard.GetUserTables().contains(tableId)); ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid; + auto openTxs = txc.DB.GetOpenTxs(localTableId); TIntrusivePtr<TTableSnapshotContext> snapContext - = new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId}); + = new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId}, !openTxs.empty()); txc.Env.MakeSnapshot(snapContext); op->SetWaitingForSnapshotFlag(); diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp new file mode 100644 index 00000000000..84238821a02 --- /dev/null +++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp @@ -0,0 +1,83 @@ +#include "datashard_impl.h" +#include "datashard_pipeline.h" +#include "execution_unit_ctors.h" + +namespace NKikimr { +namespace NDataShard { + +class TReceiveSnapshotCleanupUnit : public TExecutionUnit { +public: + TReceiveSnapshotCleanupUnit(TDataShard &dataShard, + TPipeline &pipeline); + ~TReceiveSnapshotCleanupUnit() override; + + bool IsReadyToExecute(TOperation::TPtr op) const override; + EExecutionStatus Execute(TOperation::TPtr op, + TTransactionContext &txc, + const TActorContext &ctx) override; + void Complete(TOperation::TPtr op, + const TActorContext &ctx) override; + +private: +}; + +TReceiveSnapshotCleanupUnit::TReceiveSnapshotCleanupUnit(TDataShard &dataShard, + TPipeline &pipeline) + : TExecutionUnit(EExecutionUnitKind::ReceiveSnapshotCleanup, false, dataShard, pipeline) +{ +} + +TReceiveSnapshotCleanupUnit::~TReceiveSnapshotCleanupUnit() +{ +} + +bool TReceiveSnapshotCleanupUnit::IsReadyToExecute(TOperation::TPtr) const +{ + return true; +} + +EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op, + TTransactionContext &txc, + const TActorContext &) +{ + TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get()); + Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); + + auto &schemeTx = tx->GetSchemeTx(); + if (!schemeTx.HasReceiveSnapshot()) + return EExecutionStatus::Executed; + + size_t removedTxs = 0; + for (const auto& pr : DataShard.GetUserTables()) { + auto localTid = pr.second->LocalTid; + for (ui64 txId : txc.DB.GetOpenTxs(localTid)) { + if (removedTxs >= 1000) { + // We don't want to remove more than 1000 txs at a time + // Commit current changes and reschedule + return EExecutionStatus::Reschedule; + } + txc.DB.RemoveTx(localTid, txId); + ++removedTxs; + } + } + + if (removedTxs > 0) { + return EExecutionStatus::ExecutedNoMoreRestarts; + } + + return EExecutionStatus::Executed; +} + +void TReceiveSnapshotCleanupUnit::Complete(TOperation::TPtr, + const TActorContext &) +{ +} + +THolder<TExecutionUnit> CreateReceiveSnapshotCleanupUnit(TDataShard &dataShard, + TPipeline &pipeline) +{ + return MakeHolder<TReceiveSnapshotCleanupUnit>(dataShard, pipeline); +} + +} // namespace NDataShard +} // namespace NKikimr diff --git a/ydb/core/tx/datashard/receive_snapshot_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_unit.cpp index 427a136b346..45998e31832 100644 --- a/ydb/core/tx/datashard/receive_snapshot_unit.cpp +++ b/ydb/core/tx/datashard/receive_snapshot_unit.cpp @@ -51,16 +51,26 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op, Y_VERIFY(schemeTx.HasCreateTable()); + const auto &createTableTx = schemeTx.GetCreateTable(); + + TPathId tableId(DataShard.GetPathOwnerId(), createTableTx.GetId_Deprecated()); + if (createTableTx.HasPathId()) { + Y_VERIFY(DataShard.GetPathOwnerId() == createTableTx.GetPathId().GetOwnerId()); + tableId.LocalPathId = createTableTx.GetPathId().GetLocalId(); + } + + auto userTable = DataShard.FindUserTable(tableId); + Y_VERIFY(userTable); + const bool mvcc = DataShard.IsMvccEnabled(); + bool hasOpenTxs = false; + bool loanedTables = false; + for (auto &pr : op->InReadSets()) { for (auto& rsdata : pr.second) { NKikimrTxDataShard::TSnapshotTransferReadSet rs; - // We currently support a single readset for a single user table - Y_VERIFY(DataShard.GetUserTables().size() == 1, "Support for more than 1 user table in a datashard is not implemented here"); - ui32 localTableId = DataShard.GetUserTables().begin()->second->LocalTid; - TString snapBody = rsdata.Body; if (rsdata.Body.StartsWith(SnapshotTransferReadSetMagic)) { @@ -90,12 +100,24 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op, if (DataShard.GetSnapshotManager().GetLowWatermark() < lowWatermark) DataShard.GetSnapshotManager().SetLowWatermark(db, lowWatermark); } + + if (rs.GetWithOpenTxs()) { + hasOpenTxs = true; + } } - txc.Env.LoanTable(localTableId, snapBody); + if (userTable->LocalTid != DataShard.GetLastLoanTableTid()) { + txc.Env.LoanTable(userTable->LocalTid, snapBody); + loanedTables = true; + } } } + if (loanedTables) { + // We want to make sure we won't try to loan table again on restart + DataShard.PersistLastLoanTableTid(db, userTable->LocalTid); + } + Y_VERIFY(DataShard.GetSnapshotManager().GetSnapshots().empty(), "Found unexpected persistent snapshots at CopyTable destination"); @@ -110,6 +132,14 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op, } } + if (hasOpenTxs && loanedTables) { + // We must wait for loan to complete, so ReceiveSnapshotCleanup would + // see open transactions that it needs to cleanup. + return EExecutionStatus::WaitComplete; + } + + // Don't break compatibility with previous datashard versions + // Version 22-4 and below relied on all units finishing without interruptions return EExecutionStatus::ExecutedNoMoreRestarts; } |