aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-12-31 12:40:49 +0300
committerGitHub <noreply@github.com>2024-12-31 12:40:49 +0300
commit0fe447bdd4c8ef80c909e46766ee77e528dff458 (patch)
tree65b1ea3360e5efe5dad01e5b23fd8f4b51381e07
parentd2c7d76d911468a58bafbf345b03139370c60c1e (diff)
downloadydb-0fe447bdd4c8ef80c909e46766ee77e528dff458.tar.gz
The tablet leaves executed transactions (#13134)
-rw-r--r--ydb/core/persqueue/pq_impl.cpp4
-rw-r--r--ydb/core/persqueue/ut/pqtablet_mock.cpp5
-rw-r--r--ydb/core/persqueue/ut/pqtablet_mock.h2
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp134
4 files changed, 141 insertions, 4 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 2bae935362..f2b1bf85d5 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1000,9 +1000,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
SetTxInFlyCounter();
if (tx.HasStep()) {
- if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
- PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
- }
+ PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
}
if (tx.HasWriteId()) {
diff --git a/ydb/core/persqueue/ut/pqtablet_mock.cpp b/ydb/core/persqueue/ut/pqtablet_mock.cpp
index c0e1e9ab08..4aeed8f5d5 100644
--- a/ydb/core/persqueue/ut/pqtablet_mock.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_mock.cpp
@@ -94,7 +94,10 @@ void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorCo
{
Y_UNUSED(ctx);
- ReadSet = ev->Get()->Record;
+ const auto& record = ev->Get()->Record;
+
+ ReadSet = record;
+ ReadSets[std::make_pair(record.GetStep(), record.GetTxId())].push_back(record);
}
void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx)
diff --git a/ydb/core/persqueue/ut/pqtablet_mock.h b/ydb/core/persqueue/ut/pqtablet_mock.h
index 99e8d12ad1..0862ecac03 100644
--- a/ydb/core/persqueue/ut/pqtablet_mock.h
+++ b/ydb/core/persqueue/ut/pqtablet_mock.h
@@ -32,6 +32,8 @@ public:
TMaybe<NKikimrTx::TEvReadSet> ReadSet;
TMaybe<NKikimrTx::TEvReadSetAck> ReadSetAck;
+ THashMap<std::pair<ui64, ui64>, TVector<NKikimrTx::TEvReadSet>> ReadSets;
+
private:
struct TEvPQTablet {
enum EEv {
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 34169c7707..c879a05528 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -139,6 +139,7 @@ protected:
TMaybe<ui64> Target;
TMaybe<NKikimrTx::TReadSetData::EDecision> Decision;
TMaybe<ui64> Producer;
+ TMaybe<size_t> Count;
};
struct TReadSetAckMatcher {
@@ -191,6 +192,7 @@ protected:
void WaitPlanStepAccepted(const TPlanStepAcceptedMatcher& matcher = {});
void WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
+ void WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
void SendReadSet(const TReadSetParams& params);
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
@@ -236,6 +238,9 @@ protected:
void StartPQCalcPredicateObserver(size_t& received);
void WaitForPQCalcPredicate(size_t& received, size_t expected);
+ void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state);
+ void WaitForExecStep(ui64 step);
+
//
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
//
@@ -456,6 +461,15 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
}
}
+void TPQTabletFixture::WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher)
+{
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return tablet.ReadSets[std::make_pair(*matcher.Step, *matcher.TxId)].size() >= *matcher.Count;
+ };
+ UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
+}
+
void TPQTabletFixture::SendReadSet(const TReadSetParams& params)
{
NKikimrTx::TReadSetData payload;
@@ -952,6 +966,70 @@ void TPQTabletFixture::WaitForPQCalcPredicate(size_t& received, size_t expected)
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
}
+void TPQTabletFixture::WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state)
+{
+ const TString key = GetTxKey(txId);
+
+ while (true) {
+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
+ request->Record.SetCookie(12345);
+ auto cmd = request->Record.AddCmdReadRange();
+ auto range = cmd->MutableRange();
+ range->SetFrom(key);
+ range->SetIncludeFrom(true);
+ range->SetTo(key);
+ range->SetIncludeTo(true);
+ cmd->SetIncludeData(true);
+ SendToPipe(Ctx->Edge, request.release());
+
+ auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ const auto& result = response->Record.GetReadRangeResult(0);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK));
+ const auto& pair = result.GetPair(0);
+
+ NKikimrPQ::TTransaction tx;
+ Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue()));
+
+ if (tx.GetState() == state) {
+ return;
+ }
+ }
+
+ UNIT_FAIL("transaction " << txId << " has not entered the " << state << " state");
+}
+
+void TPQTabletFixture::WaitForExecStep(ui64 step)
+{
+ while (true) {
+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
+ request->Record.SetCookie(12345);
+ auto cmd = request->Record.AddCmdReadRange();
+ auto range = cmd->MutableRange();
+ range->SetFrom("_txinfo");
+ range->SetIncludeFrom(true);
+ range->SetTo("_txinfo");
+ range->SetIncludeTo(true);
+ cmd->SetIncludeData(true);
+ SendToPipe(Ctx->Edge, request.release());
+
+ auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
+ const auto& result = response->Record.GetReadRangeResult(0);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK));
+ const auto& pair = result.GetPair(0);
+
+ NKikimrPQ::TTabletTxInfo txInfo;
+ Y_ABORT_UNLESS(txInfo.ParseFromString(pair.GetValue()));
+
+ if (txInfo.GetExecStep() == step) {
+ return;
+ }
+ }
+
+ UNIT_FAIL("expected execution step " << step);
+}
+
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
{
TestParallelTransactions("consumer", "consumer");
@@ -1730,6 +1808,62 @@ Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture)
WaitPlanStepAccepted({.Step=100});
}
+Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In_The_EXECUTED_State, TPQTabletFixture)
+{
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = txId_1 + 1;
+ const ui64 mockTabletId = 22222;
+
+ NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ // 1st tx
+ SendProposeTransactionRequest({.TxId=txId_1,
+ .Senders={mockTabletId}, .Receivers={mockTabletId},
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=100, .TxIds={txId_1}});
+
+ WaitForCalcPredicateResult();
+
+ tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId_1, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
+
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+
+ WaitForTxState(txId_1, NKikimrPQ::TTransaction::EXECUTED);
+
+ tablet->ReadSet = Nothing();
+
+ // 2nd tx
+ SendProposeTransactionRequest({.TxId=txId_2,
+ .Senders={mockTabletId}, .Receivers={mockTabletId},
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=110, .TxIds={txId_2}});
+
+ WaitForCalcPredicateResult();
+
+ WaitReadSetEx(*tablet, {.Step=110, .TxId=txId_2, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=1});
+
+ // the PQ tablet has moved a step forward
+ WaitForExecStep(110);
+
+ // restart PQ tablet
+ PQTabletRestart(*Ctx);
+
+ // the PQ tablet should send a TEvReadSet for the executed transaction
+ WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2});
+}
+
}
}