diff options
author | abcdef <[email protected]> | 2023-09-28 17:37:38 +0300 |
---|---|---|
committer | abcdef <[email protected]> | 2023-09-28 18:05:21 +0300 |
commit | ff1b1bc5292f7765c9e6c084291ca72d82f6b551 (patch) | |
tree | 98d48fbd9a2b9e1241da192b596c938ad48e4dd7 | |
parent | ce773f6205e5590916b0aa017880860acfec51f0 (diff) |
TEvReadSet may come before TEvPlanStep
событие `TEvReadSet` может прийти раньше события `TEvPlanStep`
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 20 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 48 |
3 files changed, 51 insertions, 20 deletions
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 900f5f18147..0979f9c26da 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -183,7 +183,7 @@ void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decisi Y_VERIFY(Partitions.contains(event.Partition)); - SetDecision(decision); + SetDecision(SelfDecision, decision); ++PartitionRepliesCount; } @@ -192,17 +192,19 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event, const TActorId& sender, std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack) { - Y_VERIFY(event.HasStep() && (Step == event.GetStep())); + Y_VERIFY((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep()))); Y_VERIFY(event.HasTxId() && (TxId == event.GetTxId())); if (Senders.contains(event.GetTabletProducer())) { NKikimrTx::TReadSetData data; Y_VERIFY(event.HasReadSet() && data.ParseFromString(event.GetReadSet())); - SetDecision(event.GetTabletProducer(), data.GetDecision()); + SetDecision(ParticipantsDecision, data.GetDecision()); ReadSetAcks[sender] = std::move(ack); ++ReadSetCount; + } else { + Y_VERIFY_DEBUG(false, "unknown sender tablet %" PRIu64, event.GetTabletProducer()); } } @@ -224,18 +226,6 @@ void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event ++PartitionRepliesCount; } -void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision decision) -{ - SetDecision(SelfDecision, decision); -} - -void TDistributedTransaction::SetDecision(ui64 tabletId, NKikimrTx::TReadSetData::EDecision decision) -{ - if (Senders.contains(tabletId)) { - SetDecision(ParticipantsDecision, decision); - } -} - auto TDistributedTransaction::GetDecision() const -> EDecision { constexpr EDecision commit = NKikimrTx::TReadSetData::DECISION_COMMIT; diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index b11cba6c53d..1650d33a022 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -66,9 +66,6 @@ struct TDistributedTransaction { bool WriteInProgress = false; - void SetDecision(EDecision decision); - void SetDecision(ui64 tablet, EDecision decision); - EDecision GetDecision() const; bool HaveParticipantsDecision() const; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 9f770b8fc7d..a6717ee0abf 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -59,7 +59,6 @@ struct TReadSetParams { ui64 Source = 0; ui64 Target = 0; bool Predicate = false; - ui64 SeqNo = 0; }; struct TDropTabletParams { @@ -148,7 +147,7 @@ protected: void WaitPlanStepAccepted(const TPlanStepAcceptedMatcher& matcher = {}); void WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher); - void SendReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetParams& params); + void SendReadSet(const TReadSetParams& params); void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher); void SendReadSetAck(NHelpers::TPQTabletMock& tablet); @@ -359,6 +358,26 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS } } +void TPQTabletFixture::SendReadSet(const TReadSetParams& params) +{ + NKikimrTx::TReadSetData payload; + payload.SetDecision(params.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT); + + TString body; + Y_VERIFY(payload.SerializeToString(&body)); + + auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(params.Step, + params.TxId, + params.Source, + params.Target, + params.Source, + body, + 0); + + SendToPipe(Ctx->Edge, + event.release()); +} + void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher) { if (!tablet.ReadSetAck.Defined()) { @@ -917,6 +936,31 @@ Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Senders, TPQTabletFixture) TestWaitingForTEvReadSet(0, 2); } +Y_UNIT_TEST_F(TEvReadSet_comes_before_TEvPlanStep, TPQTabletFixture) +{ + const ui64 mockTabletId = 22222; + + CreatePQTabletMock(mockTabletId); + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + const ui64 txId = 67890; + + SendProposeTransactionRequest({.TxId=txId, + .Senders={mockTabletId}, .Receivers={mockTabletId}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=1, .Path="/topic"} + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendReadSet({.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Predicate=true}); + + SendPlanStep({.Step=100, .TxIds={txId}}); + + WaitPlanStepAck({.Step=100, .TxIds={txId}}); // TEvPlanStepAck для координатора + WaitPlanStepAccepted({.Step=100}); +} + } } |