diff options
author | snaury <snaury@ydb.tech> | 2023-04-14 17:43:03 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-04-14 17:43:03 +0300 |
commit | 89641a451ba880b8b290b9e8dc8c00d236620eb3 (patch) | |
tree | 3ea23439a223308f282f0d1d4cd5b563b8354885 | |
parent | bbae5b12d1f95a60567c5c546e748cad48851d47 (diff) | |
download | ydb-89641a451ba880b8b290b9e8dc8c00d236620eb3.tar.gz |
Don't crash on locked write commit after volatile conflict commits
-rw-r--r-- | ydb/core/grpc_services/rpc_commit_transaction.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common_kqp.h | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 102 |
4 files changed, 129 insertions, 2 deletions
diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp index 97a5a541a41..c6b34a738db 100644 --- a/ydb/core/grpc_services/rpc_commit_transaction.cpp +++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp @@ -102,5 +102,10 @@ void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilit f.RegisterActor(new TCommitTransactionRPC(p.release())); } +template<> +IActor* TEvCommitTransactionRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return new TCommitTransactionRPC(msg); +} + } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 485e7562493..8df24eb1616 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -323,8 +323,11 @@ public: if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) { lock->ForAllVolatileDependencies([this](ui64 txId) { - if (VolatileDependencies.insert(txId).second && !VolatileTxId) { - VolatileTxId = EngineBay.GetTxId(); + auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId); + if (info && info->State != EVolatileTxState::Aborting) { + if (VolatileDependencies.insert(txId).second && !VolatileTxId) { + VolatileTxId = EngineBay.GetTxId(); + } } }); } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 8bfc5c5ec34..db3fbed58bf 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -17,6 +17,9 @@ namespace NKqpHelpers { using TEvDeleteSessionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>; + using TEvCommitTransactionRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest, + Ydb::Table::CommitTransactionResponse>; + template<class TResp> inline TResp AwaitResponse(TTestActorRuntime& runtime, NThreading::TFuture<TResp> f) { if (!f.HasValue() && !f.HasException()) { @@ -50,6 +53,20 @@ namespace NKqpHelpers { return sessionId; } + inline TString CommitTransactionRPC(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& database = {}) { + Ydb::Table::CommitTransactionRequest request; + request.set_session_id(sessionId); + request.set_tx_id(txId); + + auto future = NRpcService::DoLocalRpc<TEvCommitTransactionRequest>( + std::move(request), database, "", runtime.GetActorSystem(0)); + auto response = AwaitResponse(runtime, future); + if (response.operation().status() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.operation().status(); + } + return ""; + } + inline NThreading::TFuture<Ydb::Table::ExecuteDataQueryResponse> SendRequest( TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {}) { diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index aaaabd301fc..8fac76fa665 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1703,6 +1703,108 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { UNIT_ASSERT(bulkUpsertFuture.HasValue()); } + Y_UNIT_TEST(DistributedWriteThenLateWriteReadCommit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableKqpImmediateEffects(true) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", /* commitTx */ true), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + // Make an uncommitted write and read it to make sure it's flushed to datashard + TString sessionId2 = CreateSessionRPC(runtime, "/Root"); + TString txId2; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId2, txId2, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 22); + )"), + "<empty>"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId2, txId2, R"( + SELECT key, value FROM `/Root/table-1` WHERE key = 2; + )"), + "{ items { uint32_value: 2 } items { uint32_value: 22 } }"); + + // Unblock readsets and let it commit + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : std::exchange(capturedReadSets, {})) { + runtime.Send(ev.Release(), 0, true); + } + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "<empty>"); + + // Commit the transaction, it should succeed without crashing + UNIT_ASSERT_VALUES_EQUAL( + CommitTransactionRPC(runtime, sessionId2, txId2), + ""); + + // Verify the result + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr |