diff options
author | abcdef <akotov@ydb.tech> | 2023-06-13 11:27:17 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-06-13 11:27:17 +0300 |
commit | 87c877e064be333528c3c811316e5e2f59ea5db9 (patch) | |
tree | 6904a681827d75bcf382ce0fd7df693681d62311 | |
parent | fae375e6a7457009c5db20fd86358926a9d78903 (diff) | |
download | ydb-87c877e064be333528c3c811316e5e2f59ea5db9.tar.gz |
incorrect invariant condition for the WAIT_RS state
Исправлено условие проверки в состоянии `WAIT_RS`
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 103 |
2 files changed, 108 insertions, 3 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index a665866d7f..47ec65fe76 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -2885,8 +2885,6 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, SendToPipe(receiverId, tx, std::move(event), ctx); } } - - tx.ReadSetAcks.clear(); } void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx, @@ -3132,7 +3130,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, [[fallthrough]]; case NKikimrPQ::TTransaction::WAIT_RS: - Y_VERIFY(tx.ReadSetAcks.size() <= tx.Receivers.size()); + // + // the number of TEvReadSetAck sent should not be greater than the number of senders + // from TEvProposeTransaction + // + Y_VERIFY(tx.ReadSetAcks.size() <= tx.Senders.size()); if (tx.HaveParticipantsDecision()) { SendEvProposeTransactionResult(ctx, tx); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 0bbb72e165..9f770b8fc7 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -159,6 +159,10 @@ protected: void StartPQWriteStateObserver(); void WaitForPQWriteState(); + void WaitForCalcPredicateResult(); + + void TestWaitingForTEvReadSet(size_t senders, size_t receivers); + bool FoundPQWriteState = false; // @@ -420,6 +424,28 @@ void TPQTabletFixture::WaitDropTabletReply(const TDropTabletReplyMatcher& matche } } +void TPQTabletFixture::WaitForCalcPredicateResult() +{ + bool found = false; + + auto observer = [&found](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + if (auto* msg = event->CastAsLocal<TEvPQ::TEvTxCalcPredicateResult>()) { + found = true; + } + + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + Ctx->Runtime->SetObserverFunc(observer); + + TDispatchOptions options; + options.CustomFinalCondition = [&found]() { + return found; + }; + + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); +} + void TPQTabletFixture::StartPQWriteStateObserver() { FoundPQWriteState = false; @@ -814,6 +840,83 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); } +void TPQTabletFixture::TestWaitingForTEvReadSet(size_t sendersCount, size_t receiversCount) +{ + const ui64 txId = 67890; + + TVector<NHelpers::TPQTabletMock*> tablets; + TVector<ui64> senders; + TVector<ui64> receivers; + + // + // senders + // + for (size_t i = 0; i < sendersCount; ++i) { + senders.push_back(22222 + i); + tablets.push_back(CreatePQTabletMock(senders.back())); + } + + // + // receivers + // + for (size_t i = 0; i < receiversCount; ++i) { + receivers.push_back(33333 + i); + tablets.push_back(CreatePQTabletMock(receivers.back())); + } + + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + SendProposeTransactionRequest({.TxId=txId, + .Senders=senders, .Receivers=receivers, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"} + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId}}); + + WaitForCalcPredicateResult(); + + // + // The tablet received the predicate value from the partition, but has not yet saved the transaction state. + // Therefore, the transaction has not yet entered the WAIT_RS state + // + + for (size_t i = 0; i < sendersCount; ++i) { + tablets[i]->SendReadSet(*Ctx->Runtime, + {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + } + + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); +} + +Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_There_Are_More_Senders_Than_Recipients, TPQTabletFixture) +{ + TestWaitingForTEvReadSet(4, 2); +} + +Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_There_Are_Fewer_Senders_Than_Recipients, TPQTabletFixture) +{ + TestWaitingForTEvReadSet(2, 4); +} + +Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_The_Number_Of_Senders_And_Recipients_Match, TPQTabletFixture) +{ + TestWaitingForTEvReadSet(2, 2); +} + +Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Recipients, TPQTabletFixture) +{ + TestWaitingForTEvReadSet(2, 0); +} + +Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Senders, TPQTabletFixture) +{ + TestWaitingForTEvReadSet(0, 2); +} + } } |