diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-12-18 14:06:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-18 14:06:47 +0300 |
commit | aed1e77ecc63a9f1b7a2a37a4470dcc7ed60f38a (patch) | |
tree | 9b01502a4feb4bd2431204cea645e7b67f910625 | |
parent | dda6b209fc1a89c124607e47564e594cf6347b9d (diff) | |
download | ydb-aed1e77ecc63a9f1b7a2a37a4470dcc7ed60f38a.tar.gz |
Abort volatile transactions during graceful restarts (#12689)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 210 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 28 |
7 files changed, 291 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 259b2a71162..9172c245f8a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -654,7 +654,7 @@ private: LOG_E("Shard " << tabletId << " transaction lost during reconnect: " << record.GetStatus()); CancelProposal(tabletId); - ReplyTxStateUnknown(tabletId); + ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << tabletId); } void HandlePrepare(TEvDqCompute::TEvState::TPtr& ev) { @@ -696,7 +696,7 @@ private: return ReplyUnavailable(TStringBuilder() << "Could not prepare program on shard " << msg->TabletId); } - return ReplyTxStateUnknown(msg->TabletId); + return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId); } case TShardState::EState::Prepared: { @@ -717,7 +717,7 @@ private: << (msg->NotDelivered ? ", last message not delivered" : "")); CancelProposal(0); - return ReplyTxStateUnknown(msg->TabletId); + return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId); } case TShardState::EState::Initial: diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 1e4952a62e8..5cc344c85ce 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -564,7 +564,7 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven delayedAcks.clear(); } -void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) { +void TDataShard::GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) { if (!op->HasOutputData()) { // There are no replies return; @@ -588,6 +588,10 @@ void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std:: expectedReadSets.clear(); } +void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) { + GetCleanupReplies(op.Get(), cleanupReplies); +} + void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) { if (replies.empty()) { return; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 186c5b563da..75986789c84 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -919,12 +919,12 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) << " because datashard " << self.TabletID() << " is restarting"; - auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>( + auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>( kind, self.TabletID(), GetTxId(), rejectStatus); result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason); LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason); - ctx.Send(GetTarget(), result.Release(), 0, GetCookie()); + ctx.Send(GetTarget(), result.release(), 0, GetCookie()); self.IncCounter(COUNTER_PREPARE_OVERLOADED); self.IncCounter(COUNTER_PREPARE_COMPLETE); @@ -933,6 +933,35 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) // Immediate ops become ready when stopping flag is set return true; + } else if (HasVolatilePrepareFlag()) { + // Volatile transactions may be aborted at any time unless executed + // Note: we need to send the result (and discard the transaction) as + // soon as possible, because new transactions are unlikely to execute + // and commits will even more likely fail. + if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) { + auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind()); + auto status = NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED; + auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>( + kind, self.TabletID(), GetTxId(), status); + result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, TStringBuilder() + << "DataShard " << self.TabletID() << " is restarting"); + ctx.Send(GetTarget(), result.release(), 0, GetCookie()); + + // Make sure we also send acks and nodata readsets to expecting participants + std::vector<std::unique_ptr<IEventHandle>> cleanupReplies; + self.GetCleanupReplies(this, cleanupReplies); + + for (auto& ev : cleanupReplies) { + TActivationContext::Send(ev.release()); + } + + SetResultSentFlag(); + return true; + } + + // Executed transactions will have to wait until committed + // There is no way to hand-off committing volatile transactions for now + return false; } else { // Distributed operations send notification when proposed if (GetTarget() && !HasCompletedFlag()) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 92143bb14ef..86e12e8961c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1494,6 +1494,7 @@ public: TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets); void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const; + void GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies); void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies); void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies); void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies); diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp index 5059a9d979a..658d59f51ec 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp @@ -273,20 +273,26 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) { TestProposeResultLost(*fixture.Runtime, fixture.Client, Q_(R"( upsert into `/Root/table-1` (key, value) VALUES - (1, 1), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295) + (1, 11), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295) )"), [](const NKikimrKqp::TEvQueryResponse& record) { - UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED, record.DebugString()); - - TIssues issues; - IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); - UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, - "State of operation is unknown."), record.GetResponse().DebugString()); - - UNIT_ASSERT_C(HasIssue(issues, NKikimrIssues::TIssuesIds::TX_STATE_UNKNOWN, "", [] (const TIssue& issue) { - return issue.GetMessage().StartsWith("Tx state unknown for shard "); - }), record.GetResponse().DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE, record.DebugString()); + + TIssues issues; + IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + UNIT_ASSERT_C( + HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + "Kikimr cluster or one of its subsystems was unavailable."), + record.GetResponse().DebugString()); }); + + // Verify that the transaction didn't commit + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(*fixture.Runtime, + Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 3 } }"); } } // suite diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index f5af54a5455..ec1e0dcd8f7 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -3017,10 +3017,12 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { // We need to fill table with some data Cerr << "========= Upserting initial values =========" << Endl; - KqpSimpleExec(runtime, R"( - UPSERT INTO `/Root/table` (key, subkey, value) - VALUES (1, 1), (11, 11) - )"); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (1, 1), (11, 11) + )"), + "<empty>"); TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0)); TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime, @@ -3077,6 +3079,206 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { Cerr << "... split finished" << Endl; } + Y_UNIT_TEST(DistributedUpsertRestartBeforePrepare) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE); + + InitRoot(server, sender); + + Cerr << "========= Creating the table =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key)) + WITH (PARTITION_AT_KEYS = (10)); + )"), + "SUCCESS"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + // We need to fill table with some data + Cerr << "========= Upserting initial values =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (1, 1), (11, 11) + )"), + "<empty>"); + + TBlockEvents<TEvDataShard::TEvProposeTransaction> blockedPrepare(runtime); + + Cerr << "========= Starting upsert 1 =========" << Endl; + auto upsertFuture1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (2, 2), (12, 12); + )"); + + runtime.WaitFor("prepare requests", [&]{ return blockedPrepare.size() >= 2; }); + UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u); + + blockedPrepare.Stop(); + + Cerr << "========= Restarting shard 1 =========" << Endl; + GracefulRestartTablet(runtime, shards.at(0), sender); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(runtime.WaitFuture(std::move(upsertFuture1))), + "ERROR: UNAVAILABLE"); + } + + Y_UNIT_TEST(DistributedUpsertRestartAfterPrepare) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE); + + InitRoot(server, sender); + + Cerr << "========= Creating the table =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key)) + WITH (PARTITION_AT_KEYS = (10)); + )"), + "SUCCESS"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + // We need to fill table with some data + Cerr << "========= Upserting initial values =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (1, 1), (11, 11) + )"), + "<empty>"); + + TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedPrepare(runtime); + + Cerr << "========= Starting upsert 1 =========" << Endl; + auto upsertFuture1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (2, 2), (12, 12); + )"); + + runtime.WaitFor("prepare results", [&]{ return blockedPrepare.size() >= 2; }); + UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u); + + for (auto& ev : blockedPrepare) { + auto* msg = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus(), NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED); + } + + // Unblock prepare results and restart the first shard + blockedPrepare.Stop().Unblock(); + + Cerr << "========= Restarting shard 1 =========" << Endl; + GracefulRestartTablet(runtime, shards.at(0), sender); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(runtime.WaitFuture(std::move(upsertFuture1))), + "ERROR: ABORTED"); + } + + Y_UNIT_TEST(DistributedUpsertRestartAfterPlan) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE); + + InitRoot(server, sender); + + Cerr << "========= Creating the table =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key)) + WITH (PARTITION_AT_KEYS = (10)); + )"), + "SUCCESS"); + + const auto shards = GetTableShards(server, sender, "/Root/table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); + + // We need to fill table with some data + Cerr << "========= Upserting initial values =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (1, 1), (11, 11) + )"), + "<empty>"); + + TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime); + + Cerr << "========= Starting upsert 1 =========" << Endl; + auto upsertFuture1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table` (key, value) + VALUES (2, 2), (12, 12); + )"); + + runtime.WaitFor("shard plans", [&]{ return blockedPlan.size() >= 2; }); + UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u); + + // Block TEvPrivate::TEvProgressTransaction for shard1 + auto shard1actor = ResolveTablet(runtime, shards.at(0)); + TBlockEvents<IEventHandle> blockedProgress(runtime, + [&](const TAutoPtr<IEventHandle>& ev) { + return ev->GetRecipientRewrite() == shard1actor && + ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0; + }); + + // Unblock prepare results and restart the first shard + blockedPlan.Stop().Unblock(); + runtime.WaitFor("blocked progress", [&]{ return blockedProgress.size() >= 1; }); + UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1u); + + Cerr << "... sleeping for 1 second" << Endl; + runtime.SimulateSleep(TDuration::Seconds(1)); + + Cerr << "========= Restarting shard 1 =========" << Endl; + GracefulRestartTablet(runtime, shards.at(0), sender); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(runtime.WaitFuture(std::move(upsertFuture1))), + "ERROR: ABORTED"); + + Cerr << "========= Checking table =========" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table` + ORDER BY key; + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 11 } items { uint32_value: 11 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 10db8b9c679..e433b50fc3c 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -609,6 +609,34 @@ bool TWriteOperation::OnStopping(TDataShard& self, const TActorContext& ctx) { // Immediate ops become ready when stopping flag is set return true; + } else if (HasVolatilePrepareFlag()) { + // Volatile transactions may be aborted at any time unless executed + // Note: we need to send the result (and discard the transaction) as + // soon as possible, because new transactions are unlikely to execute + // and commits will even more likely fail. + if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) { + auto status = NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED; + TString reason = TStringBuilder() + << "DataShard " << TabletId << " is restarting"; + auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, std::move(reason)); + + ctx.Send(GetTarget(), result.release(), 0, GetCookie()); + + // Make sure we also send acks and nodata readsets to expecting participants + std::vector<std::unique_ptr<IEventHandle>> cleanupReplies; + self.GetCleanupReplies(this, cleanupReplies); + + for (auto& ev : cleanupReplies) { + TActivationContext::Send(ev.release()); + } + + SetResultSentFlag(); + return true; + } + + // Executed transactions will have to wait until committed + // There is no way to hand-off committing volatile transactions for now + return false; } else { // Distributed operations send notification when proposed if (GetTarget() && !HasCompletedFlag()) { |