aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2025-05-05 12:32:53 +0300
committerGitHub <noreply@github.com>2025-05-05 09:32:53 +0000
commit06defc03ff61e8c258f3a9d9363cac36ad97ebcc (patch)
treee7f57022e0fb69eadd6e8635cd193fb2d87c9d13
parentcdec7c89140a29e3435254205bea0092dc9ee466 (diff)
downloadydb-06defc03ff61e8c258f3a9d9363cac36ad97ebcc.tar.gz
Don't wait for TEvReadSetAck from non-existent tablets (#17913)
-rw-r--r--ydb/core/persqueue/pq_impl.cpp34
-rw-r--r--ydb/core/persqueue/pq_impl.h2
-rw-r--r--ydb/core/persqueue/transaction.cpp9
-rw-r--r--ydb/core/persqueue/transaction.h1
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp90
5 files changed, 133 insertions, 3 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 5ce795aae3e..5adc716597c 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -2909,9 +2909,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo
return;
}
+ if (ev->Get()->Dead) {
+ AckReadSetsToTablet(ev->Get()->TabletId, ctx);
+ return;
+ }
+
RestartPipe(ev->Get()->TabletId, ctx);
}
+void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx)
+{
+ THashSet<TDistributedTransaction*> txs;
+
+ for (ui64 txId : GetBindedTxs(tabletId)) {
+ auto* tx = GetTransaction(ctx, txId);
+ if (!tx) {
+ continue;
+ }
+
+ tx->OnReadSetAck(tabletId);
+ tx->UnbindMsgsFromPipe(tabletId);
+
+ txs.insert(tx);
+ }
+
+ if (txs.empty()) {
+ return;
+ }
+
+ for (auto* tx : txs) {
+ TryExecuteTxs(ctx, *tx);
+ }
+
+ TryWriteTxs(ctx);
+}
+
void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed");
@@ -2924,7 +2956,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo
void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
{
- for (auto& txId: GetBindedTxs(tabletId)) {
+ for (ui64 txId : GetBindedTxs(tabletId)) {
auto* tx = GetTransaction(ctx, txId);
if (!tx) {
continue;
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index a6581237a96..f1f2890043a 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -566,6 +566,8 @@ private:
void AddPendingEvent(IEventHandle* ev);
void ProcessPendingEvents();
+
+ void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx);
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 25d5ee46f01..cc016d3af4c 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -312,8 +312,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event
Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep()));
Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId()));
- if (PredicateRecipients.contains(event.GetTabletConsumer())) {
- PredicateRecipients[event.GetTabletConsumer()] = true;
+ OnReadSetAck(event.GetTabletConsumer());
+}
+
+void TDistributedTransaction::OnReadSetAck(ui64 tabletId)
+{
+ if (PredicateRecipients.contains(tabletId)) {
+ PredicateRecipients[tabletId] = true;
++PredicateAcksCount;
PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size());
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index 69600bd34e6..e9ced8f8b13 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -34,6 +34,7 @@ struct TDistributedTransaction {
const TActorId& sender,
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack);
void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event);
+ void OnReadSetAck(ui64 tabletId);
void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event);
using EDecision = NKikimrTx::TReadSetData::EDecision;
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index cabdf518416..0a5d6abd35f 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -245,6 +245,8 @@ protected:
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
+ void WaitForTheTransactionToBeDeleted(ui64 txId);
+
//
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
//
@@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
Ctx->Runtime->Send(event);
}
+void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId)
+{
+ const TString key = GetTxKey(txId);
+
+ for (size_t i = 0; i < 200; ++i) {
+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
+ request->Record.SetCookie(12345);
+ auto cmd = request->Record.AddCmdReadRange();
+ auto range = cmd->MutableRange();
+ range->SetFrom(key);
+ range->SetIncludeFrom(true);
+ range->SetTo(key);
+ range->SetIncludeTo(true);
+ cmd->SetIncludeData(false);
+ SendToPipe(Ctx->Edge, request.release());
+
+ auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+
+ const auto& result = response->Record.GetReadRangeResult(0);
+ if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) {
+ Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300));
+ continue;
+ }
+
+ if (result.GetStatus() == NKikimrProto::NODATA) {
+ return;
+ }
+ }
+
+ UNIT_FAIL("Too many attempts");
+}
+
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
{
TestParallelTransactions("consumer", "consumer");
@@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
}
+Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
+{
+ const ui64 txId = 67890;
+ const ui64 mockTabletId = MakeTabletID(false, 22222);
+
+ // We are simulating a situation where the recipient of TEvReadSet has already completed a transaction
+ // and has been deleted.
+ //
+ // To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag
+ // instead of TEvReadSetAck.
+ TTestActorRuntimeBase::TEventFilter prev;
+ auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool {
+ if (auto* msg = event->CastAsLocal<TEvTxProcessing::TEvReadSet>()) {
+ const auto& r = msg->Record;
+ if (r.GetTabletSource() == Ctx->TabletId) {
+ runtime.Send(event->Sender,
+ Ctx->Edge,
+ new TEvTabletPipe::TEvClientConnected(mockTabletId,
+ NKikimrProto::ERROR,
+ event->Sender,
+ TActorId(),
+ true,
+ true, // Dead
+ 0));
+ return true;
+ }
+ }
+ return false;
+ };
+ prev = Ctx->Runtime->SetEventFilter(filter);
+
+ NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ SendProposeTransactionRequest({.TxId=txId,
+ .Senders={mockTabletId}, .Receivers={mockTabletId},
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=100, .TxIds={txId}});
+
+ // We are sending a TEvReadSet so that the PQ tablet can complete the transaction.
+ tablet->SendReadSet(*Ctx->Runtime,
+ {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
+
+ WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+
+ // Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction
+ // will switch from the WAIT_RS_AKS state to the DELETING state.
+ WaitForTheTransactionToBeDeleted(txId);
+}
+
}
}