aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-12-18 14:06:47 +0300
committerGitHub <noreply@github.com>2024-12-18 14:06:47 +0300
commitaed1e77ecc63a9f1b7a2a37a4470dcc7ed60f38a (patch)
tree9b01502a4feb4bd2431204cea645e7b67f910625
parentdda6b209fc1a89c124607e47564e594cf6347b9d (diff)
downloadydb-aed1e77ecc63a9f1b7a2a37a4470dcc7ed60f38a.tar.gz
Abort volatile transactions during graceful restarts (#12689)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp33
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp28
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp210
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp28
7 files changed, 291 insertions, 21 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 259b2a71162..9172c245f8a 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -654,7 +654,7 @@ private:
LOG_E("Shard " << tabletId << " transaction lost during reconnect: " << record.GetStatus());
CancelProposal(tabletId);
- ReplyTxStateUnknown(tabletId);
+ ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << tabletId);
}
void HandlePrepare(TEvDqCompute::TEvState::TPtr& ev) {
@@ -696,7 +696,7 @@ private:
return ReplyUnavailable(TStringBuilder() << "Could not prepare program on shard " << msg->TabletId);
}
- return ReplyTxStateUnknown(msg->TabletId);
+ return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
}
case TShardState::EState::Prepared: {
@@ -717,7 +717,7 @@ private:
<< (msg->NotDelivered ? ", last message not delivered" : ""));
CancelProposal(0);
- return ReplyTxStateUnknown(msg->TabletId);
+ return ReplyUnavailable(TStringBuilder() << "Disconnected from shard " << msg->TabletId);
}
case TShardState::EState::Initial:
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 1e4952a62e8..5cc344c85ce 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -564,7 +564,7 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven
delayedAcks.clear();
}
-void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
+void TDataShard::GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
if (!op->HasOutputData()) {
// There are no replies
return;
@@ -588,6 +588,10 @@ void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::
expectedReadSets.clear();
}
+void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
+ GetCleanupReplies(op.Get(), cleanupReplies);
+}
+
void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) {
if (replies.empty()) {
return;
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 186c5b563da..75986789c84 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -919,12 +919,12 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)
<< " because datashard "
<< self.TabletID()
<< " is restarting";
- auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
+ auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
kind, self.TabletID(), GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
- ctx.Send(GetTarget(), result.Release(), 0, GetCookie());
+ ctx.Send(GetTarget(), result.release(), 0, GetCookie());
self.IncCounter(COUNTER_PREPARE_OVERLOADED);
self.IncCounter(COUNTER_PREPARE_COMPLETE);
@@ -933,6 +933,35 @@ bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx)
// Immediate ops become ready when stopping flag is set
return true;
+ } else if (HasVolatilePrepareFlag()) {
+ // Volatile transactions may be aborted at any time unless executed
+ // Note: we need to send the result (and discard the transaction) as
+ // soon as possible, because new transactions are unlikely to execute
+ // and commits will even more likely fail.
+ if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) {
+ auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
+ auto status = NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED;
+ auto result = std::make_unique<TEvDataShard::TEvProposeTransactionResult>(
+ kind, self.TabletID(), GetTxId(), status);
+ result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, TStringBuilder()
+ << "DataShard " << self.TabletID() << " is restarting");
+ ctx.Send(GetTarget(), result.release(), 0, GetCookie());
+
+ // Make sure we also send acks and nodata readsets to expecting participants
+ std::vector<std::unique_ptr<IEventHandle>> cleanupReplies;
+ self.GetCleanupReplies(this, cleanupReplies);
+
+ for (auto& ev : cleanupReplies) {
+ TActivationContext::Send(ev.release());
+ }
+
+ SetResultSentFlag();
+ return true;
+ }
+
+ // Executed transactions will have to wait until committed
+ // There is no way to hand-off committing volatile transactions for now
+ return false;
} else {
// Distributed operations send notification when proposed
if (GetTarget() && !HasCompletedFlag()) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 92143bb14ef..86e12e8961c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1494,6 +1494,7 @@ public:
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const;
+ void GetCleanupReplies(TOperation* op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies);
void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies);
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
index 5059a9d979a..658d59f51ec 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_errors.cpp
@@ -273,20 +273,26 @@ Y_UNIT_TEST(ProposeResultLost_RwTx) {
TestProposeResultLost(*fixture.Runtime, fixture.Client,
Q_(R"(
upsert into `/Root/table-1` (key, value) VALUES
- (1, 1), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
+ (1, 11), (1073741823, 1073741823), (2147483647, 2147483647), (4294967295, 4294967295)
)"),
[](const NKikimrKqp::TEvQueryResponse& record) {
- UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNDETERMINED, record.DebugString());
-
- TIssues issues;
- IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
- UNIT_ASSERT_C(HasIssue(issues, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN,
- "State of operation is unknown."), record.GetResponse().DebugString());
-
- UNIT_ASSERT_C(HasIssue(issues, NKikimrIssues::TIssuesIds::TX_STATE_UNKNOWN, "", [] (const TIssue& issue) {
- return issue.GetMessage().StartsWith("Tx state unknown for shard ");
- }), record.GetResponse().DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(record.GetYdbStatus(), Ydb::StatusIds::UNAVAILABLE, record.DebugString());
+
+ TIssues issues;
+ IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ UNIT_ASSERT_C(
+ HasIssue(issues, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
+ "Kikimr cluster or one of its subsystems was unavailable."),
+ record.GetResponse().DebugString());
});
+
+ // Verify that the transaction didn't commit
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(*fixture.Runtime,
+ Q_("SELECT key, value FROM `/Root/table-1` ORDER BY key")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
}
} // suite
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index f5af54a5455..ec1e0dcd8f7 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -3017,10 +3017,12 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
- KqpSimpleExec(runtime, R"(
- UPSERT INTO `/Root/table` (key, subkey, value)
- VALUES (1, 1), (11, 11)
- )");
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (1, 1), (11, 11)
+ )"),
+ "<empty>");
TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
@@ -3077,6 +3079,206 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
Cerr << "... split finished" << Endl;
}
+ Y_UNIT_TEST(DistributedUpsertRestartBeforePrepare) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ Cerr << "========= Creating the table =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSchemeExec(runtime, R"(
+ CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
+ WITH (PARTITION_AT_KEYS = (10));
+ )"),
+ "SUCCESS");
+
+ const auto shards = GetTableShards(server, sender, "/Root/table");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+
+ // We need to fill table with some data
+ Cerr << "========= Upserting initial values =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (1, 1), (11, 11)
+ )"),
+ "<empty>");
+
+ TBlockEvents<TEvDataShard::TEvProposeTransaction> blockedPrepare(runtime);
+
+ Cerr << "========= Starting upsert 1 =========" << Endl;
+ auto upsertFuture1 = KqpSimpleSend(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (2, 2), (12, 12);
+ )");
+
+ runtime.WaitFor("prepare requests", [&]{ return blockedPrepare.size() >= 2; });
+ UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);
+
+ blockedPrepare.Stop();
+
+ Cerr << "========= Restarting shard 1 =========" << Endl;
+ GracefulRestartTablet(runtime, shards.at(0), sender);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
+ "ERROR: UNAVAILABLE");
+ }
+
+ Y_UNIT_TEST(DistributedUpsertRestartAfterPrepare) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ Cerr << "========= Creating the table =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSchemeExec(runtime, R"(
+ CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
+ WITH (PARTITION_AT_KEYS = (10));
+ )"),
+ "SUCCESS");
+
+ const auto shards = GetTableShards(server, sender, "/Root/table");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+
+ // We need to fill table with some data
+ Cerr << "========= Upserting initial values =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (1, 1), (11, 11)
+ )"),
+ "<empty>");
+
+ TBlockEvents<TEvDataShard::TEvProposeTransactionResult> blockedPrepare(runtime);
+
+ Cerr << "========= Starting upsert 1 =========" << Endl;
+ auto upsertFuture1 = KqpSimpleSend(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (2, 2), (12, 12);
+ )");
+
+ runtime.WaitFor("prepare results", [&]{ return blockedPrepare.size() >= 2; });
+ UNIT_ASSERT_VALUES_EQUAL(blockedPrepare.size(), 2u);
+
+ for (auto& ev : blockedPrepare) {
+ auto* msg = ev->Get();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Record.GetStatus(), NKikimrTxDataShard::TEvProposeTransactionResult::PREPARED);
+ }
+
+ // Unblock prepare results and restart the first shard
+ blockedPrepare.Stop().Unblock();
+
+ Cerr << "========= Restarting shard 1 =========" << Endl;
+ GracefulRestartTablet(runtime, shards.at(0), sender);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
+ "ERROR: ABORTED");
+ }
+
+ Y_UNIT_TEST(DistributedUpsertRestartAfterPlan) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableDataShardVolatileTransactions(true);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ Cerr << "========= Creating the table =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSchemeExec(runtime, R"(
+ CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
+ WITH (PARTITION_AT_KEYS = (10));
+ )"),
+ "SUCCESS");
+
+ const auto shards = GetTableShards(server, sender, "/Root/table");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+
+ // We need to fill table with some data
+ Cerr << "========= Upserting initial values =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (1, 1), (11, 11)
+ )"),
+ "<empty>");
+
+ TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime);
+
+ Cerr << "========= Starting upsert 1 =========" << Endl;
+ auto upsertFuture1 = KqpSimpleSend(runtime, R"(
+ UPSERT INTO `/Root/table` (key, value)
+ VALUES (2, 2), (12, 12);
+ )");
+
+ runtime.WaitFor("shard plans", [&]{ return blockedPlan.size() >= 2; });
+ UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);
+
+ // Block TEvPrivate::TEvProgressTransaction for shard1
+ auto shard1actor = ResolveTablet(runtime, shards.at(0));
+ TBlockEvents<IEventHandle> blockedProgress(runtime,
+ [&](const TAutoPtr<IEventHandle>& ev) {
+ return ev->GetRecipientRewrite() == shard1actor &&
+ ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0;
+ });
+
+ // Unblock prepare results and restart the first shard
+ blockedPlan.Stop().Unblock();
+ runtime.WaitFor("blocked progress", [&]{ return blockedProgress.size() >= 1; });
+ UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1u);
+
+ Cerr << "... sleeping for 1 second" << Endl;
+ runtime.SimulateSleep(TDuration::Seconds(1));
+
+ Cerr << "========= Restarting shard 1 =========" << Endl;
+ GracefulRestartTablet(runtime, shards.at(0), sender);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
+ "ERROR: ABORTED");
+
+ Cerr << "========= Checking table =========" << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table`
+ ORDER BY key;
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 11 } items { uint32_value: 11 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index 10db8b9c679..e433b50fc3c 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -609,6 +609,34 @@ bool TWriteOperation::OnStopping(TDataShard& self, const TActorContext& ctx) {
// Immediate ops become ready when stopping flag is set
return true;
+ } else if (HasVolatilePrepareFlag()) {
+ // Volatile transactions may be aborted at any time unless executed
+ // Note: we need to send the result (and discard the transaction) as
+ // soon as possible, because new transactions are unlikely to execute
+ // and commits will even more likely fail.
+ if (!HasResultSentFlag() && !Result() && !HasCompletedFlag()) {
+ auto status = NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED;
+ TString reason = TStringBuilder()
+ << "DataShard " << TabletId << " is restarting";
+ auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, std::move(reason));
+
+ ctx.Send(GetTarget(), result.release(), 0, GetCookie());
+
+ // Make sure we also send acks and nodata readsets to expecting participants
+ std::vector<std::unique_ptr<IEventHandle>> cleanupReplies;
+ self.GetCleanupReplies(this, cleanupReplies);
+
+ for (auto& ev : cleanupReplies) {
+ TActivationContext::Send(ev.release());
+ }
+
+ SetResultSentFlag();
+ return true;
+ }
+
+ // Executed transactions will have to wait until committed
+ // There is no way to hand-off committing volatile transactions for now
+ return false;
} else {
// Distributed operations send notification when proposed
if (GetTarget() && !HasCompletedFlag()) {