diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-12-31 12:40:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-31 12:40:49 +0300 |
commit | 0fe447bdd4c8ef80c909e46766ee77e528dff458 (patch) | |
tree | 65b1ea3360e5efe5dad01e5b23fd8f4b51381e07 | |
parent | d2c7d76d911468a58bafbf345b03139370c60c1e (diff) | |
download | ydb-0fe447bdd4c8ef80c909e46766ee77e528dff458.tar.gz |
The tablet leaves executed transactions (#13134)
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_mock.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_mock.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 134 |
4 files changed, 141 insertions, 4 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 2bae935362..f2b1bf85d5 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1000,9 +1000,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& SetTxInFlyCounter(); if (tx.HasStep()) { - if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) { - PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); - } + PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); } if (tx.HasWriteId()) { diff --git a/ydb/core/persqueue/ut/pqtablet_mock.cpp b/ydb/core/persqueue/ut/pqtablet_mock.cpp index c0e1e9ab08..4aeed8f5d5 100644 --- a/ydb/core/persqueue/ut/pqtablet_mock.cpp +++ b/ydb/core/persqueue/ut/pqtablet_mock.cpp @@ -94,7 +94,10 @@ void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorCo { Y_UNUSED(ctx); - ReadSet = ev->Get()->Record; + const auto& record = ev->Get()->Record; + + ReadSet = record; + ReadSets[std::make_pair(record.GetStep(), record.GetTxId())].push_back(record); } void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx) diff --git a/ydb/core/persqueue/ut/pqtablet_mock.h b/ydb/core/persqueue/ut/pqtablet_mock.h index 99e8d12ad1..0862ecac03 100644 --- a/ydb/core/persqueue/ut/pqtablet_mock.h +++ b/ydb/core/persqueue/ut/pqtablet_mock.h @@ -32,6 +32,8 @@ public: TMaybe<NKikimrTx::TEvReadSet> ReadSet; TMaybe<NKikimrTx::TEvReadSetAck> ReadSetAck; + THashMap<std::pair<ui64, ui64>, TVector<NKikimrTx::TEvReadSet>> ReadSets; + private: struct TEvPQTablet { enum EEv { diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 34169c7707..c879a05528 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -139,6 +139,7 @@ protected: TMaybe<ui64> Target; TMaybe<NKikimrTx::TReadSetData::EDecision> Decision; TMaybe<ui64> Producer; + TMaybe<size_t> Count; }; struct TReadSetAckMatcher { @@ -191,6 +192,7 @@ protected: void WaitPlanStepAccepted(const TPlanStepAcceptedMatcher& matcher = {}); void WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher); + void WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher); void SendReadSet(const TReadSetParams& params); void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher); @@ -236,6 +238,9 @@ protected: void StartPQCalcPredicateObserver(size_t& received); void WaitForPQCalcPredicate(size_t& received, size_t expected); + void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state); + void WaitForExecStep(ui64 step); + // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait // @@ -456,6 +461,15 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS } } +void TPQTabletFixture::WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher) +{ + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return tablet.ReadSets[std::make_pair(*matcher.Step, *matcher.TxId)].size() >= *matcher.Count; + }; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); +} + void TPQTabletFixture::SendReadSet(const TReadSetParams& params) { NKikimrTx::TReadSetData payload; @@ -952,6 +966,70 @@ void TPQTabletFixture::WaitForPQCalcPredicate(size_t& received, size_t expected) UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); } +void TPQTabletFixture::WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state) +{ + const TString key = GetTxKey(txId); + + while (true) { + auto request = std::make_unique<TEvKeyValue::TEvRequest>(); + request->Record.SetCookie(12345); + auto cmd = request->Record.AddCmdReadRange(); + auto range = cmd->MutableRange(); + range->SetFrom(key); + range->SetIncludeFrom(true); + range->SetTo(key); + range->SetIncludeTo(true); + cmd->SetIncludeData(true); + SendToPipe(Ctx->Edge, request.release()); + + auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + const auto& result = response->Record.GetReadRangeResult(0); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK)); + const auto& pair = result.GetPair(0); + + NKikimrPQ::TTransaction tx; + Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); + + if (tx.GetState() == state) { + return; + } + } + + UNIT_FAIL("transaction " << txId << " has not entered the " << state << " state"); +} + +void TPQTabletFixture::WaitForExecStep(ui64 step) +{ + while (true) { + auto request = std::make_unique<TEvKeyValue::TEvRequest>(); + request->Record.SetCookie(12345); + auto cmd = request->Record.AddCmdReadRange(); + auto range = cmd->MutableRange(); + range->SetFrom("_txinfo"); + range->SetIncludeFrom(true); + range->SetTo("_txinfo"); + range->SetIncludeTo(true); + cmd->SetIncludeData(true); + SendToPipe(Ctx->Edge, request.release()); + + auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK); + const auto& result = response->Record.GetReadRangeResult(0); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK)); + const auto& pair = result.GetPair(0); + + NKikimrPQ::TTabletTxInfo txInfo; + Y_ABORT_UNLESS(txInfo.ParseFromString(pair.GetValue())); + + if (txInfo.GetExecStep() == step) { + return; + } + } + + UNIT_FAIL("expected execution step " << step); +} + Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture) { TestParallelTransactions("consumer", "consumer"); @@ -1730,6 +1808,62 @@ Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture) WaitPlanStepAccepted({.Step=100}); } +Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In_The_EXECUTED_State, TPQTabletFixture) +{ + const ui64 txId_1 = 67890; + const ui64 txId_2 = txId_1 + 1; + const ui64 mockTabletId = 22222; + + NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId); + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + // 1st tx + SendProposeTransactionRequest({.TxId=txId_1, + .Senders={mockTabletId}, .Receivers={mockTabletId}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId_1}}); + + WaitForCalcPredicateResult(); + + tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId_1, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT}); + + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + WaitForTxState(txId_1, NKikimrPQ::TTransaction::EXECUTED); + + tablet->ReadSet = Nothing(); + + // 2nd tx + SendProposeTransactionRequest({.TxId=txId_2, + .Senders={mockTabletId}, .Receivers={mockTabletId}, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=110, .TxIds={txId_2}}); + + WaitForCalcPredicateResult(); + + WaitReadSetEx(*tablet, {.Step=110, .TxId=txId_2, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=1}); + + // the PQ tablet has moved a step forward + WaitForExecStep(110); + + // restart PQ tablet + PQTabletRestart(*Ctx); + + // the PQ tablet should send a TEvReadSet for the executed transaction + WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2}); +} + } } |