diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2025-05-05 12:32:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-05 09:32:53 +0000 |
commit | 06defc03ff61e8c258f3a9d9363cac36ad97ebcc (patch) | |
tree | e7f57022e0fb69eadd6e8635cd193fb2d87c9d13 | |
parent | cdec7c89140a29e3435254205bea0092dc9ee466 (diff) | |
download | ydb-06defc03ff61e8c258f3a9d9363cac36ad97ebcc.tar.gz |
Don't wait for TEvReadSetAck from non-existent tablets (#17913)
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 34 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 9 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 90 |
5 files changed, 133 insertions, 3 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 5ce795aae3e..5adc716597c 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -2909,9 +2909,41 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo return; } + if (ev->Get()->Dead) { + AckReadSetsToTablet(ev->Get()->TabletId, ctx); + return; + } + RestartPipe(ev->Get()->TabletId, ctx); } +void TPersQueue::AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx) +{ + THashSet<TDistributedTransaction*> txs; + + for (ui64 txId : GetBindedTxs(tabletId)) { + auto* tx = GetTransaction(ctx, txId); + if (!tx) { + continue; + } + + tx->OnReadSetAck(tabletId); + tx->UnbindMsgsFromPipe(tabletId); + + txs.insert(tx); + } + + if (txs.empty()) { + return; + } + + for (auto* tx : txs) { + TryExecuteTxs(ctx, *tx); + } + + TryWriteTxs(ctx); +} + void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { PQ_LOG_D("Handle TEvTabletPipe::TEvClientDestroyed"); @@ -2924,7 +2956,7 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActo void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) { - for (auto& txId: GetBindedTxs(tabletId)) { + for (ui64 txId : GetBindedTxs(tabletId)) { auto* tx = GetTransaction(ctx, txId); if (!tx) { continue; diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index a6581237a96..f1f2890043a 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -566,6 +566,8 @@ private: void AddPendingEvent(IEventHandle* ev); void ProcessPendingEvents(); + + void AckReadSetsToTablet(ui64 tabletId, const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 25d5ee46f01..cc016d3af4c 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -312,8 +312,13 @@ void TDistributedTransaction::OnReadSetAck(const NKikimrTx::TEvReadSetAck& event Y_ABORT_UNLESS(event.HasStep() && (Step == event.GetStep())); Y_ABORT_UNLESS(event.HasTxId() && (TxId == event.GetTxId())); - if (PredicateRecipients.contains(event.GetTabletConsumer())) { - PredicateRecipients[event.GetTabletConsumer()] = true; + OnReadSetAck(event.GetTabletConsumer()); +} + +void TDistributedTransaction::OnReadSetAck(ui64 tabletId) +{ + if (PredicateRecipients.contains(tabletId)) { + PredicateRecipients[tabletId] = true; ++PredicateAcksCount; PQ_LOG_D("Predicate acks " << PredicateAcksCount << "/" << PredicateRecipients.size()); diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 69600bd34e6..e9ced8f8b13 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -34,6 +34,7 @@ struct TDistributedTransaction { const TActorId& sender, std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack); void OnReadSetAck(const NKikimrTx::TEvReadSetAck& event); + void OnReadSetAck(ui64 tabletId); void OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event); using EDecision = NKikimrTx::TReadSetData::EDecision; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index cabdf518416..0a5d6abd35f 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -245,6 +245,8 @@ protected: void InterceptSaveTxState(TAutoPtr<IEventHandle>& event); void SendSaveTxState(TAutoPtr<IEventHandle>& event); + void WaitForTheTransactionToBeDeleted(ui64 txId); + // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait // @@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event) Ctx->Runtime->Send(event); } +void TPQTabletFixture::WaitForTheTransactionToBeDeleted(ui64 txId) +{ + const TString key = GetTxKey(txId); + + for (size_t i = 0; i < 200; ++i) { + auto request = std::make_unique<TEvKeyValue::TEvRequest>(); + request->Record.SetCookie(12345); + auto cmd = request->Record.AddCmdReadRange(); + auto range = cmd->MutableRange(); + range->SetFrom(key); + range->SetIncludeFrom(true); + range->SetTo(key); + range->SetIncludeTo(true); + cmd->SetIncludeData(false); + SendToPipe(Ctx->Edge, request.release()); + + auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + + const auto& result = response->Record.GetReadRangeResult(0); + if (result.GetStatus() == static_cast<ui32>(NKikimrProto::OK)) { + Ctx->Runtime->SimulateSleep(TDuration::MilliSeconds(300)); + continue; + } + + if (result.GetStatus() == NKikimrProto::NODATA) { + return; + } + } + + UNIT_FAIL("Too many attempts"); +} + Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture) { TestParallelTransactions("consumer", "consumer"); @@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture) WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId}); } +Y_UNIT_TEST_F(TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture) +{ + const ui64 txId = 67890; + const ui64 mockTabletId = MakeTabletID(false, 22222); + + // We are simulating a situation where the recipient of TEvReadSet has already completed a transaction + // and has been deleted. + // + // To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag + // instead of TEvReadSetAck. + TTestActorRuntimeBase::TEventFilter prev; + auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool { + if (auto* msg = event->CastAsLocal<TEvTxProcessing::TEvReadSet>()) { + const auto& r = msg->Record; + if (r.GetTabletSource() == Ctx->TabletId) { + runtime.Send(event->Sender, + Ctx->Edge, + new TEvTabletPipe::TEvClientConnected(mockTabletId, + NKikimrProto::ERROR, + event->Sender, + TActorId(), + true, + true, // Dead + 0)); + return true; + } + } + return false; + }; + prev = Ctx->Runtime->SetEventFilter(filter); + + NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId); + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + SendProposeTransactionRequest({.TxId=txId, + .Senders={mockTabletId}, .Receivers={mockTabletId}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId}}); + + // We are sending a TEvReadSet so that the PQ tablet can complete the transaction. + tablet->SendReadSet(*Ctx->Runtime, + {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + + WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + // Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction + // will switch from the WAIT_RS_AKS state to the DELETING state. + WaitForTheTransactionToBeDeleted(txId); +} + } } |