aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-06-13 11:27:17 +0300
committerabcdef <akotov@ydb.tech>2023-06-13 11:27:17 +0300
commit87c877e064be333528c3c811316e5e2f59ea5db9 (patch)
tree6904a681827d75bcf382ce0fd7df693681d62311
parentfae375e6a7457009c5db20fd86358926a9d78903 (diff)
downloadydb-87c877e064be333528c3c811316e5e2f59ea5db9.tar.gz
incorrect invariant condition for the WAIT_RS state
Исправлено условие проверки в состоянии `WAIT_RS`
-rw-r--r--ydb/core/persqueue/pq_impl.cpp8
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp103
2 files changed, 108 insertions, 3 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index a665866d7f..47ec65fe76 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -2885,8 +2885,6 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
SendToPipe(receiverId, tx, std::move(event), ctx);
}
}
-
- tx.ReadSetAcks.clear();
}
void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
@@ -3132,7 +3130,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
[[fallthrough]];
case NKikimrPQ::TTransaction::WAIT_RS:
- Y_VERIFY(tx.ReadSetAcks.size() <= tx.Receivers.size());
+ //
+ // the number of TEvReadSetAck sent should not be greater than the number of senders
+ // from TEvProposeTransaction
+ //
+ Y_VERIFY(tx.ReadSetAcks.size() <= tx.Senders.size());
if (tx.HaveParticipantsDecision()) {
SendEvProposeTransactionResult(ctx, tx);
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 0bbb72e165..9f770b8fc7 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -159,6 +159,10 @@ protected:
void StartPQWriteStateObserver();
void WaitForPQWriteState();
+ void WaitForCalcPredicateResult();
+
+ void TestWaitingForTEvReadSet(size_t senders, size_t receivers);
+
bool FoundPQWriteState = false;
//
@@ -420,6 +424,28 @@ void TPQTabletFixture::WaitDropTabletReply(const TDropTabletReplyMatcher& matche
}
}
+void TPQTabletFixture::WaitForCalcPredicateResult()
+{
+ bool found = false;
+
+ auto observer = [&found](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ if (auto* msg = event->CastAsLocal<TEvPQ::TEvTxCalcPredicateResult>()) {
+ found = true;
+ }
+
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+
+ Ctx->Runtime->SetObserverFunc(observer);
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&found]() {
+ return found;
+ };
+
+ UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
+}
+
void TPQTabletFixture::StartPQWriteStateObserver()
{
FoundPQWriteState = false;
@@ -814,6 +840,83 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
}
+void TPQTabletFixture::TestWaitingForTEvReadSet(size_t sendersCount, size_t receiversCount)
+{
+ const ui64 txId = 67890;
+
+ TVector<NHelpers::TPQTabletMock*> tablets;
+ TVector<ui64> senders;
+ TVector<ui64> receivers;
+
+ //
+ // senders
+ //
+ for (size_t i = 0; i < sendersCount; ++i) {
+ senders.push_back(22222 + i);
+ tablets.push_back(CreatePQTabletMock(senders.back()));
+ }
+
+ //
+ // receivers
+ //
+ for (size_t i = 0; i < receiversCount; ++i) {
+ receivers.push_back(33333 + i);
+ tablets.push_back(CreatePQTabletMock(receivers.back()));
+ }
+
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ SendProposeTransactionRequest({.TxId=txId,
+ .Senders=senders, .Receivers=receivers,
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}
+ }});
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=100, .TxIds={txId}});
+
+ WaitForCalcPredicateResult();
+
+ //
+ // The tablet received the predicate value from the partition, but has not yet saved the transaction state.
+ // Therefore, the transaction has not yet entered the WAIT_RS state
+ //
+
+ for (size_t i = 0; i < sendersCount; ++i) {
+ tablets[i]->SendReadSet(*Ctx->Runtime,
+ {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
+ }
+
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+}
+
+Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_There_Are_More_Senders_Than_Recipients, TPQTabletFixture)
+{
+ TestWaitingForTEvReadSet(4, 2);
+}
+
+Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_There_Are_Fewer_Senders_Than_Recipients, TPQTabletFixture)
+{
+ TestWaitingForTEvReadSet(2, 4);
+}
+
+Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_When_The_Number_Of_Senders_And_Recipients_Match, TPQTabletFixture)
+{
+ TestWaitingForTEvReadSet(2, 2);
+}
+
+Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Recipients, TPQTabletFixture)
+{
+ TestWaitingForTEvReadSet(2, 0);
+}
+
+Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Senders, TPQTabletFixture)
+{
+ TestWaitingForTEvReadSet(0, 2);
+}
+
}
}