aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-04-14 17:43:03 +0300
committersnaury <snaury@ydb.tech>2023-04-14 17:43:03 +0300
commit89641a451ba880b8b290b9e8dc8c00d236620eb3 (patch)
tree3ea23439a223308f282f0d1d4cd5b563b8354885
parentbbae5b12d1f95a60567c5c546e748cad48851d47 (diff)
downloadydb-89641a451ba880b8b290b9e8dc8c00d236620eb3.tar.gz
Don't crash on locked write commit after volatile conflict commits
-rw-r--r--ydb/core/grpc_services/rpc_commit_transaction.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h17
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp102
4 files changed, 129 insertions, 2 deletions
diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp
index 97a5a541a41..c6b34a738db 100644
--- a/ydb/core/grpc_services/rpc_commit_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp
@@ -102,5 +102,10 @@ void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilit
f.RegisterActor(new TCommitTransactionRPC(p.release()));
}
+template<>
+IActor* TEvCommitTransactionRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ return new TCommitTransactionRPC(msg);
+}
+
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 485e7562493..8df24eb1616 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -323,8 +323,11 @@ public:
if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) {
lock->ForAllVolatileDependencies([this](ui64 txId) {
- if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
- VolatileTxId = EngineBay.GetTxId();
+ auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId);
+ if (info && info->State != EVolatileTxState::Aborting) {
+ if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
+ VolatileTxId = EngineBay.GetTxId();
+ }
}
});
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 8bfc5c5ec34..db3fbed58bf 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -17,6 +17,9 @@ namespace NKqpHelpers {
using TEvDeleteSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest,
Ydb::Table::DeleteSessionResponse>;
+ using TEvCommitTransactionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest,
+ Ydb::Table::CommitTransactionResponse>;
+
template<class TResp>
inline TResp AwaitResponse(TTestActorRuntime& runtime, NThreading::TFuture<TResp> f) {
if (!f.HasValue() && !f.HasException()) {
@@ -50,6 +53,20 @@ namespace NKqpHelpers {
return sessionId;
}
+ inline TString CommitTransactionRPC(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& database = {}) {
+ Ydb::Table::CommitTransactionRequest request;
+ request.set_session_id(sessionId);
+ request.set_tx_id(txId);
+
+ auto future = NRpcService::DoLocalRpc<TEvCommitTransactionRequest>(
+ std::move(request), database, "", runtime.GetActorSystem(0));
+ auto response = AwaitResponse(runtime, future);
+ if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.operation().status();
+ }
+ return "";
+ }
+
inline NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> SendRequest(
TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
{
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index aaaabd301fc..8fac76fa665 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1703,6 +1703,108 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT(bulkUpsertFuture.HasValue());
}
+ Y_UNIT_TEST(DistributedWriteThenLateWriteReadCommit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableKqpImmediateEffects(true)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", /* commitTx */ true), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ // Make an uncommitted write and read it to make sure it's flushed to datashard
+ TString sessionId2 = CreateSessionRPC(runtime, "/Root");
+ TString txId2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId2, txId2, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22);
+ )"),
+ "<empty>");
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleContinue(runtime, sessionId2, txId2, R"(
+ SELECT key, value FROM `/Root/table-1` WHERE key = 2;
+ )"),
+ "{ items { uint32_value: 2 } items { uint32_value: 22 } }");
+
+ // Unblock readsets and let it commit
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : std::exchange(capturedReadSets, {})) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "<empty>");
+
+ // Commit the transaction, it should succeed without crashing
+ UNIT_ASSERT_VALUES_EQUAL(
+ CommitTransactionRPC(runtime, sessionId2, txId2),
+ "");
+
+ // Verify the result
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 22 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr