summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/tx/datashard/datashard_ut_write.cpp101
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp3
2 files changed, 103 insertions, 1 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_write.cpp b/ydb/core/tx/datashard/datashard_ut_write.cpp
index 6291554537f..906ce5d909b 100644
--- a/ydb/core/tx/datashard/datashard_ut_write.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_write.cpp
@@ -1586,5 +1586,106 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
}
}
+ Y_UNIT_TEST(PreparedDistributedWritePageFault) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardVolatileTransactions(false);
+
+ auto [runtime, server, sender] = TestCreateServer(serverSettings);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+
+ // Use a policy without levels and very small page sizes, effectively making each row on its own page
+ NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
+ policy->MinDataPageSize = 1;
+
+ auto opts = TShardedTableOptions()
+ .Columns({{"key", "Int32", true, false},
+ {"value", "Int32", false, false}})
+ .Policy(policy.Get());
+ const auto& columns = opts.Columns_;
+ auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table", opts);
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
+
+ const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
+
+ const ui64 lockTxId1 = 1234567890001;
+ const ui64 lockNodeId = runtime.GetNodeId(0);
+ NLongTxService::TLockHandle lockHandle1(lockTxId1, runtime.GetActorSystem(0));
+
+ auto shard1 = shards.at(0);
+ NKikimrDataEvents::TLock lock1shard1;
+
+ // 1. Make an uncommitted write (lock1 shard1)
+ {
+ Cerr << "... making an uncommmited write to " << shard1 << Endl;
+ auto req = MakeWriteRequestOneKeyValue(
+ std::nullopt,
+ NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+ NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+ tableId,
+ columns,
+ 1, 11);
+ req->SetLockId(lockTxId1, lockNodeId);
+ auto result = Write(runtime, sender, shard1, std::move(req));
+ UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u);
+ lock1shard1 = result.GetTxLocks().at(0);
+ UNIT_ASSERT_C(lock1shard1.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard1.ShortDebugString());
+ }
+
+ // 2. Compact and reboot the tablet
+ Cerr << "... compacting shard " << shard1 << Endl;
+ CompactTable(runtime, shard1, tableId, false);
+ Cerr << "... rebooting shard " << shard1 << Endl;
+ RebootTablet(runtime, shard1, sender);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ // 3. Prepare a distributed write (single shard for simplicity)
+ ui64 txId1 = 1234567890011;
+ auto tx1sender = runtime.AllocateEdgeActor();
+ {
+ auto req1 = MakeWriteRequestOneKeyValue(
+ txId1,
+ NKikimrDataEvents::TEvWrite::MODE_PREPARE,
+ NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+ tableId,
+ columns,
+ 1, 22);
+ req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+
+ Cerr << "... preparing tx1 at " << shard1 << Endl;
+ auto res1 = Write(runtime, tx1sender, shard1, std::move(req1));
+
+ // Reboot, making sure tx is only loaded after it's planned
+ // This causes tx to skip conflicts cache and go to execution
+ // The first attempt to execute will page fault looking for conflicts
+ // Tx will be released, and will trigger the bug on restore
+ Cerr << "... rebooting shard " << shard1 << Endl;
+ RebootTablet(runtime, shard1, sender);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ ui64 minStep = res1.GetMinStep();
+ ui64 maxStep = res1.GetMaxStep();
+
+ Cerr << "... planning tx1 at " << coordinator << Endl;
+ SendProposeToCoordinator(
+ runtime, tx1sender, { shard1 }, {
+ .TxId = txId1,
+ .Coordinator = coordinator,
+ .MinStep = minStep,
+ .MaxStep = maxStep,
+ });
+ }
+
+ // 4. Check tx1 reply (it must succeed)
+ {
+ Cerr << "... waiting for tx1 result" << Endl;
+ auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx1sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+ }
+ }
+
} // Y_UNIT_TEST_SUITE
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index 5ec4e5e92ba..0d67c7f0235 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -416,8 +416,9 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self)
void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase& provider) {
ReleasedTxDataSize = provider.GetMemoryLimit() + provider.GetRequestedMemory();
- if (!WriteTx || IsTxDataReleased())
+ if (!WriteTx || WriteTx->GetIsReleased()) {
return;
+ }
WriteTx->ReleaseTxData();
// Immediate transactions have no body stored.