aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-10-06 12:37:18 +0300
committersnaury <snaury@ydb.tech>2022-10-06 12:37:18 +0300
commita9fdd4fb908d14a055b3a536c523d679391a70b3 (patch)
tree04395fdaa7d992729ce33741c04ea528b5a5ef35
parent92515b8e8d1c011fcbd3916230ef55e294bf9be5 (diff)
downloadydb-a9fdd4fb908d14a055b3a536c523d679391a70b3.tar.gz
Clean uncommitted changes on split and copy table
-rw-r--r--ydb/core/protos/tx_datashard.proto1
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp5
-rw-r--r--ydb/core/tablet_flat/flat_database.h7
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp14
-rw-r--r--ydb/core/tablet_flat/flat_table.h2
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt1
-rw-r--r--ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/create_table_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard.cpp37
-rw-r--r--ydb/core/tx/datashard/datashard.h21
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h31
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h9
-rw-r--r--ydb/core/tx/datashard/datashard_split_src.cpp23
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp237
-rw-r--r--ydb/core/tx/datashard/execution_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/execution_unit_ctors.h1
-rw-r--r--ydb/core/tx/datashard/execution_unit_kind.h1
-rw-r--r--ydb/core/tx/datashard/make_snapshot_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp83
-rw-r--r--ydb/core/tx/datashard/receive_snapshot_unit.cpp40
23 files changed, 534 insertions, 8 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index f002b678fa4..d415560d6ef 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -306,6 +306,7 @@ message TSnapshotTransferReadSet {
optional uint64 MvccLowWatermarkTxId = 9;
optional uint64 MvccImmediateWriteEdgeStep = 10;
optional uint64 MvccImmediateWriteEdgeTxId = 11;
+ optional bool WithOpenTxs = 12;
}
message TSnapshotTransferInfo {
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 6d705ab3de0..8cb61e0fee7 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -374,6 +374,11 @@ bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const
return Require(table)->HasRemovedTx(txId);
}
+TVector<ui64> TDatabase::GetOpenTxs(ui32 table) const
+{
+ return Require(table)->GetOpenTxs();
+}
+
void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper)
{
if (Y_LIKELY(lower < upper)) {
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index c714d44f3ed..a30e4190fa8 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -128,6 +128,13 @@ public:
bool HasRemovedTx(ui32 table, ui64 txId) const;
/**
+ * Returns a list of open transactions in the provided table. This only
+ * includes transactions with changes that are neither committed nor
+ * removed.
+ */
+ TVector<ui64> GetOpenTxs(ui32 table) const;
+
+ /**
* Remove row versions [lower, upper) from the given table
*
* Once committed this cannot be undone. This is a hint to the underlying
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index c52869ded54..5e976d64c2e 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -911,6 +911,20 @@ bool TTable::HasRemovedTx(ui64 txId) const
return RemovedTransactions.Contains(txId);
}
+TVector<ui64> TTable::GetOpenTxs() const
+{
+ TVector<ui64> txs;
+
+ for (auto& pr : TxRefs) {
+ ui64 txId = pr.first;
+ if (!CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId)) {
+ txs.push_back(txId);
+ }
+ }
+
+ return txs;
+}
+
TMemTable& TTable::MemTable()
{
if (!Mutable) {
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 1b9992de118..54430fcfbc9 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -172,6 +172,8 @@ public:
bool HasCommittedTx(ui64 txId) const;
bool HasRemovedTx(ui64 txId) const;
+ TVector<ui64> GetOpenTxs() const;
+
TPartView GetPartView(const TLogoBlobID &bundle) const
{
auto *partView = Flatten.FindPtr(bundle);
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 8c50eba3293..f7f36a6afb4 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -220,6 +220,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_op_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp
diff --git a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
index 8aef437313e..95dd8b28f05 100644
--- a/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp
@@ -63,6 +63,9 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid;
for (auto &snapshot : op->InputSnapshots()) {
+ auto* txSnapshot = dynamic_cast<TTxTableSnapshotContext*>(snapshot.Get());
+ Y_VERIFY(txSnapshot, "Unexpected input snapshot type");
+
TString snapBody = DataShard.BorrowSnapshot(localTableId, *snapshot, { }, { }, targetTablet);
txc.Env.DropSnapshot(snapshot);
@@ -84,7 +87,7 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
TRowVersion lowWatermark = mvcc ? DataShard.GetSnapshotManager().GetLowWatermark() :
TRowVersion::Min();
- if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark)
+ if (minVersion || completeEdge || incompleteEdge || immediateWriteEdge || lowWatermark || txSnapshot->HasOpenTxs())
extended = true; // Must use an extended format
if (extended) {
@@ -122,6 +125,10 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
rs.SetMvccLowWatermarkTxId(lowWatermark.TxId);
}
+ if (txSnapshot->HasOpenTxs()) {
+ rs.SetWithOpenTxs(true);
+ }
+
rsBody.reserve(SnapshotTransferReadSetMagic.size() + rs.ByteSizeLong());
rsBody.append(SnapshotTransferReadSetMagic);
Y_PROTOBUF_SUPPRESS_NODISCARD rs.AppendToString(&rsBody);
diff --git a/ydb/core/tx/datashard/create_table_unit.cpp b/ydb/core/tx/datashard/create_table_unit.cpp
index 85f38d456a3..e8b253cd020 100644
--- a/ydb/core/tx/datashard/create_table_unit.cpp
+++ b/ydb/core/tx/datashard/create_table_unit.cpp
@@ -56,6 +56,11 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
tableId.LocalPathId = createTableTx.GetPathId().GetLocalId();
}
+ if (DataShard.HasUserTable(tableId)) {
+ // Table already created, assume we restarted after a successful commit
+ return EExecutionStatus::Executed;
+ }
+
const ui64 schemaVersion = createTableTx.HasTableSchemaVersion() ? createTableTx.GetTableSchemaVersion() : 0u;
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 9a9b30fbe69..a60dc40a2b1 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -964,6 +964,11 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));
}
+void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {
+ LastLoanTableTid = localTid;
+ PersistSys(db, Schema::Sys_LastLoanTableTid, LastLoanTableTid);
+}
+
TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc,
const NKikimrSchemeOp::TTableDescription& tableScheme)
{
@@ -3323,6 +3328,38 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
return true;
}
+class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxGetOpenTxs(TDataShard* self, TEvDataShard::TEvGetOpenTxs::TPtr&& ev)
+ : TTransactionBase(self)
+ , Ev(std::move(ev))
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ auto pathId = Ev->Get()->PathId;
+ auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin();
+ Y_VERIFY(it != Self->GetUserTables().end());
+
+ auto txs = txc.DB.GetOpenTxs(it->second->LocalTid);
+
+ Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(txs));
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie);
+ }
+
+private:
+ TEvDataShard::TEvGetOpenTxs::TPtr Ev;
+ THolder<TEvDataShard::TEvGetOpenTxsResult> Reply;
+};
+
+void TDataShard::Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx) {
+ Execute(new TTxGetOpenTxs(this, std::move(ev)), ctx);
+}
+
+
} // NDataShard
TString TEvDataShard::TEvRead::ToString() const {
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 7a7e362aea8..2c295f99005 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -305,6 +305,9 @@ struct TEvDataShard {
EvTestLoadResponse,
EvTestLoadFinished,
+ EvGetOpenTxs, /* for tests */
+ EvGetOpenTxsResult, /* for tests */
+
EvEnd
};
@@ -1573,6 +1576,24 @@ struct TEvDataShard {
}
};
+ struct TEvGetOpenTxs : public TEventLocal<TEvGetOpenTxs, EvGetOpenTxs> {
+ TPathId PathId;
+
+ TEvGetOpenTxs(const TPathId& pathId)
+ : PathId(pathId)
+ { }
+ };
+
+ struct TEvGetOpenTxsResult : public TEventLocal<TEvGetOpenTxsResult, EvGetOpenTxsResult> {
+ TPathId PathId;
+ TVector<ui64> OpenTxs;
+
+ TEvGetOpenTxsResult(const TPathId& pathId, TVector<ui64> openTxs)
+ : PathId(pathId)
+ , OpenTxs(std::move(openTxs))
+ { }
+ };
+
};
IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info);
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index 7d296877ca0..3f2d579d6a0 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -21,6 +21,7 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext&
try {
Self->State = TShardState::Unknown;
Self->LastLocalTid = Schema::MinLocalTid;
+ Self->LastLoanTableTid = 0;
Self->LastSeqno = 1;
Self->NextChangeRecordOrder = 1;
Self->LastChangeRecordGroup = 1;
@@ -178,6 +179,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
// Reads from Sys table
LOAD_SYS_UI64(db, Schema::Sys_State, Self->State);
LOAD_SYS_UI64(db, Schema::Sys_LastLocalTid, Self->LastLocalTid);
+ LOAD_SYS_UI64(db, Schema::Sys_LastLoanTableTid, Self->LastLoanTableTid);
LOAD_SYS_UI64(db, Schema::Sys_LastSeqno, Self->LastSeqno);
LOAD_SYS_UI64(db, Schema::Sys_NextChangeRecordOrder, Self->NextChangeRecordOrder);
LOAD_SYS_UI64(db, Schema::Sys_LastChangeRecordGroup, Self->LastChangeRecordGroup);
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 6904293b1e4..159d0b516f2 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -859,6 +859,7 @@ void TActiveTransaction::BuildExecutionPlan(bool loaded)
plan.push_back(EExecutionUnitKind::Restore);
plan.push_back(EExecutionUnitKind::CreateTable);
plan.push_back(EExecutionUnitKind::ReceiveSnapshot);
+ plan.push_back(EExecutionUnitKind::ReceiveSnapshotCleanup);
plan.push_back(EExecutionUnitKind::AlterMoveShadow);
plan.push_back(EExecutionUnitKind::AlterTable);
plan.push_back(EExecutionUnitKind::DropTable);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 2ec648f65af..01333c30e66 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -60,9 +60,10 @@ using NLongTxService::TEvLongTxService;
// For CopyTable and MoveShadow
class TTxTableSnapshotContext : public NTabletFlatExecutor::TTableSnapshotContext {
public:
- TTxTableSnapshotContext(ui64 step, ui64 txId, TVector<ui32>&& tables)
+ TTxTableSnapshotContext(ui64 step, ui64 txId, TVector<ui32>&& tables, bool hasOpenTxs = false)
: StepOrder(step, txId)
, Tables(tables)
+ , HasOpenTxs_(hasOpenTxs)
{}
const TStepOrder& GetStepOrder() const {
@@ -73,9 +74,14 @@ public:
return Tables;
}
+ bool HasOpenTxs() const {
+ return HasOpenTxs_;
+ }
+
private:
TStepOrder StepOrder;
TVector<ui32> Tables;
+ bool HasOpenTxs_;
};
// For Split
@@ -207,6 +213,7 @@ class TDataShard
class TTxCompactTable;
class TTxPersistFullCompactionTs;
class TTxRemoveLock;
+ class TTxGetOpenTxs;
template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
@@ -870,6 +877,8 @@ class TDataShard
SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled
SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled
+ Sys_LastLoanTableTid, // 41 Last tid that we used in LoanTable
+
// reserved
SysPipeline_Flags = 1000,
SysPipeline_LimitActiveTx,
@@ -882,6 +891,7 @@ class TDataShard
static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value");
static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value");
static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value");
+ static_assert(ESysTableKeys::Sys_LastLoanTableTid == 41, "Sys_LastLoanTableTid changed its value");
static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000
@@ -1064,6 +1074,8 @@ class TDataShard
void Handle(TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx);
+
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
void DoPeriodicTasks(const TActorContext &ctx);
@@ -1248,6 +1260,18 @@ public:
return prefix;
}
+ bool HasUserTable(const TPathId& tableId) {
+ return TableInfos.contains(tableId.LocalPathId);
+ }
+
+ TUserTable::TCPtr FindUserTable(const TPathId& tableId) {
+ auto it = TableInfos.find(tableId.LocalPathId);
+ if (it != TableInfos.end()) {
+ return it->second;
+ }
+ return nullptr;
+ }
+
void RemoveUserTable(const TPathId& tableId) {
TableInfos.erase(tableId.LocalPathId);
SysLocks.RemoveSchema(tableId);
@@ -1489,6 +1513,9 @@ public:
void DropUserTable(TTransactionContext& txc, ui64 tableId);
ui32 GetLastLocalTid() const { return LastLocalTid; }
+ ui32 GetLastLoanTableTid() const { return LastLoanTableTid; }
+
+ void PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid);
ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db);
ui64 AllocateChangeRecordGroup(NIceDb::TNiceDb& db);
@@ -2103,6 +2130,7 @@ private:
// Sys table contents
ui32 State;
ui32 LastLocalTid;
+ ui32 LastLoanTableTid;
ui64 LastSeqno;
ui64 NextChangeRecordOrder;
ui64 LastChangeRecordGroup;
@@ -2465,6 +2493,7 @@ protected:
fFunc(TEvDataShard::EvReplicationSourceOffsetsAck, HandleByReplicationSourceOffsetsServer);
fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer);
HFunc(TEvLongTxService::TEvLockStatus, Handle);
+ HFunc(TEvDataShard::TEvGetOpenTxs, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index 999ddd5973e..a48b97301a0 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -876,6 +876,13 @@ TSysLocks::TLock TSysLocks::GetLock(const TArrayRef<const TCell>& key) const {
return TLock();
}
+void TSysLocks::EraseLock(ui64 lockId) {
+ Y_VERIFY(Update);
+ if (auto* lock = Locker.FindLockPtr(lockId)) {
+ Update->AddEraseLock(lock);
+ }
+}
+
void TSysLocks::EraseLock(const TArrayRef<const TCell>& key) {
Y_VERIFY(Update);
if (auto* lock = Locker.FindLockPtr(GetLockId(key))) {
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index e9e844a9242..7d65c115aba 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -595,6 +595,10 @@ public:
PendingSubscribeLocks.clear();
}
+ const THashMap<ui64, TLockInfo::TPtr>& GetLocks() const {
+ return Locks;
+ }
+
private:
const THolder<TLocksDataShard> Self;
THashMap<ui64, TLockInfo::TPtr> Locks; // key is LockId
@@ -799,6 +803,7 @@ public:
TVector<TLock> ApplyLocks();
ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const;
TLock GetLock(const TArrayRef<const TCell>& syslockKey) const;
+ void EraseLock(ui64 lockId);
void EraseLock(const TArrayRef<const TCell>& syslockKey);
void CommitLock(const TArrayRef<const TCell>& syslockKey);
void SetLock(const TTableId& tableId, const TArrayRef<const TCell>& key);
@@ -856,6 +861,10 @@ public:
bool Load(ILocksDb& db);
+ const THashMap<ui64, TLockInfo::TPtr>& GetLocks() const {
+ return Locker.GetLocks();
+ }
+
private:
THolder<TLocksDataShard> Self;
TLockLocker Locker;
diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp
index eba1ea0a0fa..f762801c221 100644
--- a/ydb/core/tx/datashard/datashard_split_src.cpp
+++ b/ydb/core/tx/datashard/datashard_split_src.cpp
@@ -1,4 +1,6 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
+#include "setup_sys_locks.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/util/pb.h>
@@ -127,6 +129,27 @@ public:
Y_VERIFY(Self->TxInFly() == 0, "Currently split operation shouldn't start while there are in-flight transactions");
+ // We need to remove all locks first, making sure persistent uncommitted
+ // changes are not borrowed by new shards. Otherwise those will become
+ // unaccounted for.
+ if (!Self->SysLocksTable().GetLocks().empty()) {
+ auto countBefore = Self->SysLocksTable().GetLocks().size();
+ TDataShardLocksDb locksDb(*Self, txc);
+ TSetupSysLocks guardLocks(*Self, &locksDb);
+ for (auto& pr : Self->SysLocksTable().GetLocks()) {
+ Self->SysLocksTable().EraseLock(pr.first);
+ if (pr.second->IsPersistent()) {
+ // Don't erase more than one persistent lock at a time
+ break;
+ }
+ }
+ Self->SysLocksTable().ApplyLocks();
+ auto countAfter = Self->SysLocksTable().GetLocks().size();
+ Y_VERIFY(countAfter < countBefore, "Expected to erase at least one lock");
+ Self->Execute(Self->CreateTxStartSplit(), ctx);
+ return true;
+ }
+
ui64 opId = Self->SrcSplitOpId;
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " starting snapshot for split OpId " << opId);
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index d86aeb3b27f..cab83571ed4 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3268,6 +3268,243 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
}
+ Y_UNIT_TEST(LockedWriteCleanupOnSplit) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+
+ // Check shard has some open transactions
+ {
+ auto checkSender = runtime.AllocateEdgeActor();
+ runtime.SendToPipe(shards.at(0), checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender);
+ UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
+ }
+
+ // Split/merge would fail otherwise :(
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+
+ // Split table in two shards
+ {
+ //Cerr << "----Split Begin----" << Endl;
+ auto senderSplit = runtime.AllocateEdgeActor();
+ auto tablets = GetTableShards(server, senderSplit, "/Root/table-1");
+ ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/table-1", tablets.at(0), 3);
+ WaitTxNotification(server, senderSplit, txId);
+ //Cerr << "----Split End----" << Endl;
+ }
+
+ shards = GetTableShards(server, sender, "/Root/table-1");
+
+ // Check new shards don't have any open transactions
+ {
+ auto checkSender = runtime.AllocateEdgeActor();
+ for (auto shardId : shards) {
+ runtime.SendToPipe(shardId, checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender);
+ UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shardId);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(LockedWriteCleanupOnCopyTable) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to key 2 with tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ auto tableId = ResolveTableId(server, sender, "/Root/table-1");
+
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+
+ // Check shard has some open transactions
+ {
+ auto checkSender = runtime.AllocateEdgeActor();
+ runtime.SendToPipe(shards.at(0), checkSender, new TEvDataShard::TEvGetOpenTxs(tableId.PathId));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender);
+ UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
+ }
+
+ // Copy table
+ {
+ auto senderCopy = runtime.AllocateEdgeActor();
+ ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1");
+ WaitTxNotification(server, senderCopy, txId);
+ }
+
+ auto shards2 = GetTableShards(server, sender, "/Root/table-2");
+ auto tableId2 = ResolveTableId(server, sender, "/Root/table-2");
+
+ // Check new shards don't have any open transactions
+ {
+ auto checkSender = runtime.AllocateEdgeActor();
+ for (auto shardId : shards2) {
+ runtime.SendToPipe(shardId, checkSender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(checkSender);
+ UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shardId);
+ }
+ }
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // Check original table has those changes visible
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+
+ // Check table copy does not have those changes
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value FROM `/Root/table-2`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execution_unit.cpp b/ydb/core/tx/datashard/execution_unit.cpp
index c4dd0c88622..5e8c32a6582 100644
--- a/ydb/core/tx/datashard/execution_unit.cpp
+++ b/ydb/core/tx/datashard/execution_unit.cpp
@@ -94,6 +94,8 @@ THolder<TExecutionUnit> CreateExecutionUnit(EExecutionUnitKind kind,
return CreateCreateTableUnit(dataShard, pipeline);
case EExecutionUnitKind::ReceiveSnapshot:
return CreateReceiveSnapshotUnit(dataShard, pipeline);
+ case EExecutionUnitKind::ReceiveSnapshotCleanup:
+ return CreateReceiveSnapshotCleanupUnit(dataShard, pipeline);
case EExecutionUnitKind::AlterMoveShadow:
return CreateAlterMoveShadowUnit(dataShard, pipeline);
case EExecutionUnitKind::AlterTable:
diff --git a/ydb/core/tx/datashard/execution_unit_ctors.h b/ydb/core/tx/datashard/execution_unit_ctors.h
index e03c77e2c2a..2585a2d9b8e 100644
--- a/ydb/core/tx/datashard/execution_unit_ctors.h
+++ b/ydb/core/tx/datashard/execution_unit_ctors.h
@@ -47,6 +47,7 @@ THolder<TExecutionUnit> CreateBackupUnit(TDataShard &dataShard, TPipeline &pipel
THolder<TExecutionUnit> CreateRestoreUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateCreateTableUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateReceiveSnapshotUnit(TDataShard &dataShard, TPipeline &pipeline);
+THolder<TExecutionUnit> CreateReceiveSnapshotCleanupUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateAlterMoveShadowUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateAlterTableUnit(TDataShard &dataShard, TPipeline &pipeline);
THolder<TExecutionUnit> CreateDropTableUnit(TDataShard &dataShard, TPipeline &pipeline);
diff --git a/ydb/core/tx/datashard/execution_unit_kind.h b/ydb/core/tx/datashard/execution_unit_kind.h
index 832854d966a..a96aaaaf214 100644
--- a/ydb/core/tx/datashard/execution_unit_kind.h
+++ b/ydb/core/tx/datashard/execution_unit_kind.h
@@ -49,6 +49,7 @@ enum class EExecutionUnitKind : ui32 {
Restore,
CreateTable,
ReceiveSnapshot,
+ ReceiveSnapshotCleanup,
AlterMoveShadow,
AlterTable,
DropTable,
diff --git a/ydb/core/tx/datashard/make_snapshot_unit.cpp b/ydb/core/tx/datashard/make_snapshot_unit.cpp
index 8532f04d29e..4321daa414b 100644
--- a/ydb/core/tx/datashard/make_snapshot_unit.cpp
+++ b/ydb/core/tx/datashard/make_snapshot_unit.cpp
@@ -59,8 +59,9 @@ EExecutionStatus TMakeSnapshotUnit::Execute(TOperation::TPtr op,
Y_VERIFY(DataShard.GetUserTables().contains(tableId));
ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid;
+ auto openTxs = txc.DB.GetOpenTxs(localTableId);
TIntrusivePtr<TTableSnapshotContext> snapContext
- = new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId});
+ = new TTxTableSnapshotContext(op->GetStep(), op->GetTxId(), {localTableId}, !openTxs.empty());
txc.Env.MakeSnapshot(snapContext);
op->SetWaitingForSnapshotFlag();
diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
new file mode 100644
index 00000000000..84238821a02
--- /dev/null
+++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
@@ -0,0 +1,83 @@
+#include "datashard_impl.h"
+#include "datashard_pipeline.h"
+#include "execution_unit_ctors.h"
+
+namespace NKikimr {
+namespace NDataShard {
+
+class TReceiveSnapshotCleanupUnit : public TExecutionUnit {
+public:
+ TReceiveSnapshotCleanupUnit(TDataShard &dataShard,
+ TPipeline &pipeline);
+ ~TReceiveSnapshotCleanupUnit() override;
+
+ bool IsReadyToExecute(TOperation::TPtr op) const override;
+ EExecutionStatus Execute(TOperation::TPtr op,
+ TTransactionContext &txc,
+ const TActorContext &ctx) override;
+ void Complete(TOperation::TPtr op,
+ const TActorContext &ctx) override;
+
+private:
+};
+
+TReceiveSnapshotCleanupUnit::TReceiveSnapshotCleanupUnit(TDataShard &dataShard,
+ TPipeline &pipeline)
+ : TExecutionUnit(EExecutionUnitKind::ReceiveSnapshotCleanup, false, dataShard, pipeline)
+{
+}
+
+TReceiveSnapshotCleanupUnit::~TReceiveSnapshotCleanupUnit()
+{
+}
+
+bool TReceiveSnapshotCleanupUnit::IsReadyToExecute(TOperation::TPtr) const
+{
+ return true;
+}
+
+EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op,
+ TTransactionContext &txc,
+ const TActorContext &)
+{
+ TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
+ Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
+
+ auto &schemeTx = tx->GetSchemeTx();
+ if (!schemeTx.HasReceiveSnapshot())
+ return EExecutionStatus::Executed;
+
+ size_t removedTxs = 0;
+ for (const auto& pr : DataShard.GetUserTables()) {
+ auto localTid = pr.second->LocalTid;
+ for (ui64 txId : txc.DB.GetOpenTxs(localTid)) {
+ if (removedTxs >= 1000) {
+ // We don't want to remove more than 1000 txs at a time
+ // Commit current changes and reschedule
+ return EExecutionStatus::Reschedule;
+ }
+ txc.DB.RemoveTx(localTid, txId);
+ ++removedTxs;
+ }
+ }
+
+ if (removedTxs > 0) {
+ return EExecutionStatus::ExecutedNoMoreRestarts;
+ }
+
+ return EExecutionStatus::Executed;
+}
+
+void TReceiveSnapshotCleanupUnit::Complete(TOperation::TPtr,
+ const TActorContext &)
+{
+}
+
+THolder<TExecutionUnit> CreateReceiveSnapshotCleanupUnit(TDataShard &dataShard,
+ TPipeline &pipeline)
+{
+ return MakeHolder<TReceiveSnapshotCleanupUnit>(dataShard, pipeline);
+}
+
+} // namespace NDataShard
+} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/receive_snapshot_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_unit.cpp
index 427a136b346..45998e31832 100644
--- a/ydb/core/tx/datashard/receive_snapshot_unit.cpp
+++ b/ydb/core/tx/datashard/receive_snapshot_unit.cpp
@@ -51,16 +51,26 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op,
Y_VERIFY(schemeTx.HasCreateTable());
+ const auto &createTableTx = schemeTx.GetCreateTable();
+
+ TPathId tableId(DataShard.GetPathOwnerId(), createTableTx.GetId_Deprecated());
+ if (createTableTx.HasPathId()) {
+ Y_VERIFY(DataShard.GetPathOwnerId() == createTableTx.GetPathId().GetOwnerId());
+ tableId.LocalPathId = createTableTx.GetPathId().GetLocalId();
+ }
+
+ auto userTable = DataShard.FindUserTable(tableId);
+ Y_VERIFY(userTable);
+
const bool mvcc = DataShard.IsMvccEnabled();
+ bool hasOpenTxs = false;
+ bool loanedTables = false;
+
for (auto &pr : op->InReadSets()) {
for (auto& rsdata : pr.second) {
NKikimrTxDataShard::TSnapshotTransferReadSet rs;
- // We currently support a single readset for a single user table
- Y_VERIFY(DataShard.GetUserTables().size() == 1, "Support for more than 1 user table in a datashard is not implemented here");
- ui32 localTableId = DataShard.GetUserTables().begin()->second->LocalTid;
-
TString snapBody = rsdata.Body;
if (rsdata.Body.StartsWith(SnapshotTransferReadSetMagic)) {
@@ -90,12 +100,24 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op,
if (DataShard.GetSnapshotManager().GetLowWatermark() < lowWatermark)
DataShard.GetSnapshotManager().SetLowWatermark(db, lowWatermark);
}
+
+ if (rs.GetWithOpenTxs()) {
+ hasOpenTxs = true;
+ }
}
- txc.Env.LoanTable(localTableId, snapBody);
+ if (userTable->LocalTid != DataShard.GetLastLoanTableTid()) {
+ txc.Env.LoanTable(userTable->LocalTid, snapBody);
+ loanedTables = true;
+ }
}
}
+ if (loanedTables) {
+ // We want to make sure we won't try to loan table again on restart
+ DataShard.PersistLastLoanTableTid(db, userTable->LocalTid);
+ }
+
Y_VERIFY(DataShard.GetSnapshotManager().GetSnapshots().empty(),
"Found unexpected persistent snapshots at CopyTable destination");
@@ -110,6 +132,14 @@ EExecutionStatus TReceiveSnapshotUnit::Execute(TOperation::TPtr op,
}
}
+ if (hasOpenTxs && loanedTables) {
+ // We must wait for loan to complete, so ReceiveSnapshotCleanup would
+ // see open transactions that it needs to cleanup.
+ return EExecutionStatus::WaitComplete;
+ }
+
+ // Don't break compatibility with previous datashard versions
+ // Version 22-4 and below relied on all units finishing without interruptions
return EExecutionStatus::ExecutedNoMoreRestarts;
}