summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <[email protected]>2023-09-28 17:37:38 +0300
committerabcdef <[email protected]>2023-09-28 18:05:21 +0300
commitff1b1bc5292f7765c9e6c084291ca72d82f6b551 (patch)
tree98d48fbd9a2b9e1241da192b596c938ad48e4dd7
parentce773f6205e5590916b0aa017880860acfec51f0 (diff)
TEvReadSet may come before TEvPlanStep
событие `TEvReadSet` может прийти раньше события `TEvPlanStep`
-rw-r--r--ydb/core/persqueue/transaction.cpp20
-rw-r--r--ydb/core/persqueue/transaction.h3
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp48
3 files changed, 51 insertions, 20 deletions
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 900f5f18147..0979f9c26da 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -183,7 +183,7 @@ void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decisi
Y_VERIFY(Partitions.contains(event.Partition));
- SetDecision(decision);
+ SetDecision(SelfDecision, decision);
++PartitionRepliesCount;
}
@@ -192,17 +192,19 @@ void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
const TActorId& sender,
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack)
{
- Y_VERIFY(event.HasStep() && (Step == event.GetStep()));
+ Y_VERIFY((Step == Max<ui64>()) || (event.HasStep() && (Step == event.GetStep())));
Y_VERIFY(event.HasTxId() && (TxId == event.GetTxId()));
if (Senders.contains(event.GetTabletProducer())) {
NKikimrTx::TReadSetData data;
Y_VERIFY(event.HasReadSet() && data.ParseFromString(event.GetReadSet()));
- SetDecision(event.GetTabletProducer(), data.GetDecision());
+ SetDecision(ParticipantsDecision, data.GetDecision());
ReadSetAcks[sender] = std::move(ack);
++ReadSetCount;
+ } else {
+ Y_VERIFY_DEBUG(false, "unknown sender tablet %" PRIu64, event.GetTabletProducer());
}
}
@@ -224,18 +226,6 @@ void TDistributedTransaction::OnTxCommitDone(const TEvPQ::TEvTxCommitDone& event
++PartitionRepliesCount;
}
-void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision decision)
-{
- SetDecision(SelfDecision, decision);
-}
-
-void TDistributedTransaction::SetDecision(ui64 tabletId, NKikimrTx::TReadSetData::EDecision decision)
-{
- if (Senders.contains(tabletId)) {
- SetDecision(ParticipantsDecision, decision);
- }
-}
-
auto TDistributedTransaction::GetDecision() const -> EDecision
{
constexpr EDecision commit = NKikimrTx::TReadSetData::DECISION_COMMIT;
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index b11cba6c53d..1650d33a022 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -66,9 +66,6 @@ struct TDistributedTransaction {
bool WriteInProgress = false;
- void SetDecision(EDecision decision);
- void SetDecision(ui64 tablet, EDecision decision);
-
EDecision GetDecision() const;
bool HaveParticipantsDecision() const;
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 9f770b8fc7d..a6717ee0abf 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -59,7 +59,6 @@ struct TReadSetParams {
ui64 Source = 0;
ui64 Target = 0;
bool Predicate = false;
- ui64 SeqNo = 0;
};
struct TDropTabletParams {
@@ -148,7 +147,7 @@ protected:
void WaitPlanStepAccepted(const TPlanStepAcceptedMatcher& matcher = {});
void WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
- void SendReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetParams& params);
+ void SendReadSet(const TReadSetParams& params);
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
void SendReadSetAck(NHelpers::TPQTabletMock& tablet);
@@ -359,6 +358,26 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
}
}
+void TPQTabletFixture::SendReadSet(const TReadSetParams& params)
+{
+ NKikimrTx::TReadSetData payload;
+ payload.SetDecision(params.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
+
+ TString body;
+ Y_VERIFY(payload.SerializeToString(&body));
+
+ auto event = std::make_unique<TEvTxProcessing::TEvReadSet>(params.Step,
+ params.TxId,
+ params.Source,
+ params.Target,
+ params.Source,
+ body,
+ 0);
+
+ SendToPipe(Ctx->Edge,
+ event.release());
+}
+
void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher)
{
if (!tablet.ReadSetAck.Defined()) {
@@ -917,6 +936,31 @@ Y_UNIT_TEST_F(Test_Waiting_For_TEvReadSet_Without_Senders, TPQTabletFixture)
TestWaitingForTEvReadSet(0, 2);
}
+Y_UNIT_TEST_F(TEvReadSet_comes_before_TEvPlanStep, TPQTabletFixture)
+{
+ const ui64 mockTabletId = 22222;
+
+ CreatePQTabletMock(mockTabletId);
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ const ui64 txId = 67890;
+
+ SendProposeTransactionRequest({.TxId=txId,
+ .Senders={mockTabletId}, .Receivers={mockTabletId},
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=1, .Path="/topic"}
+ }});
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendReadSet({.Step=100, .TxId=txId, .Source=mockTabletId, .Target=Ctx->TabletId, .Predicate=true});
+
+ SendPlanStep({.Step=100, .TxIds={txId}});
+
+ WaitPlanStepAck({.Step=100, .TxIds={txId}}); // TEvPlanStepAck для координатора
+ WaitPlanStepAccepted({.Step=100});
+}
+
}
}