diff options
author | abcdef <akotov@ydb.tech> | 2023-02-09 12:43:00 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-02-09 12:43:00 +0300 |
commit | f90cda0ca455e4ba0c4818fbb553453a111eb51d (patch) | |
tree | b029804325756ba1e93abd734450769f75157716 | |
parent | 24689527cd888aa8a640ecb5077e656b3520d373 (diff) | |
download | ydb-f90cda0ca455e4ba0c4818fbb553453a111eb51d.tar.gz |
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 99 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 237 |
3 files changed, 300 insertions, 48 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index c2089e97297..58d35a74199 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -901,14 +901,36 @@ void TPersQueue::ReturnTabletState(const TActorContext& ctx, const TChangeNotifi ctx.Send(req.Actor, event.Release()); } -void TPersQueue::ReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status) +void TPersQueue::TryReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status) { - for (auto req : TabletStateRequests) - ReturnTabletState(ctx, req, status); - TabletStateRequests.clear(); + if (AllTransactionsHaveBeenProcessed() && (TabletState == NKikimrPQ::EDropped)) { + for (auto req : TabletStateRequests) { + ReturnTabletState(ctx, req, status); + } + TabletStateRequests.clear(); + } +} + +void TPersQueue::BeginWriteTabletState(const TActorContext& ctx, NKikimrPQ::ETabletState state) +{ + NKikimrPQ::TTabletState stateProto; + stateProto.SetState(state); + TString strState; + bool ok = stateProto.SerializeToString(&strState); + Y_VERIFY(ok); + + TAutoPtr<TEvKeyValue::TEvRequest> kvRequest(new TEvKeyValue::TEvRequest); + kvRequest->Record.SetCookie(WRITE_STATE_COOKIE); + + auto kvCmd = kvRequest->Record.AddCmdWrite(); + kvCmd->SetKey(KeyState()); + kvCmd->SetValue(strState); + kvCmd->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + + ctx.Send(ctx.SelfID, kvRequest.Release()); } -void TPersQueue::HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) +void TPersQueue::EndWriteTabletState(const NKikimrClient::TResponse& resp, const TActorContext& ctx) { bool ok = (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && (resp.WriteResultSize() == 1) && @@ -922,7 +944,8 @@ void TPersQueue::HandleStateWriteResponse(const NKikimrClient::TResponse& resp, } TabletState = NKikimrPQ::EDropped; - ReturnTabletStateAll(ctx); + + TryReturnTabletStateAll(ctx); } void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) @@ -938,7 +961,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& HandleConfigReadResponse(resp, ctx); break; case WRITE_STATE_COOKIE: - HandleStateWriteResponse(resp, ctx); + EndWriteTabletState(resp, ctx); break; case WRITE_TX_COOKIE: EndWriteTxs(resp, ctx); @@ -1343,34 +1366,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorConte TChangeNotification stateRequest(ev->Sender, txId); NKikimrPQ::ETabletState reqState = record.GetRequestedState(); - if (reqState == TabletState) { - ReturnTabletState(ctx, stateRequest, NKikimrProto::OK); - return; - } else if (reqState == NKikimrPQ::ENormal && - TabletState == NKikimrPQ::EDropped) { + + if (reqState == NKikimrPQ::ENormal) { ReturnTabletState(ctx, stateRequest, NKikimrProto::ERROR); return; } - TabletStateRequests.insert(stateRequest); - if (TabletStateRequests.size() > 1) - return; // already sent, just enqueue + Y_VERIFY(reqState == NKikimrPQ::EDropped); - NKikimrPQ::TTabletState stateProto; - stateProto.SetState(record.GetRequestedState()); - TString strState; - bool ok = stateProto.SerializeToString(&strState); - Y_VERIFY(ok); - - TAutoPtr<TEvKeyValue::TEvRequest> kvRequest(new TEvKeyValue::TEvRequest); - kvRequest->Record.SetCookie(WRITE_STATE_COOKIE); - - auto kvCmd = kvRequest->Record.AddCmdWrite(); - kvCmd->SetKey(KeyState()); - kvCmd->SetValue(strState); - kvCmd->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + TabletStateRequests.insert(stateRequest); + if (TabletStateRequests.size() > 1) { + return; // already writing, just enqueue + } - ctx.Send(ctx.SelfID, kvRequest.Release()); + BeginWriteTabletState(ctx, reqState); } void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& ctx) @@ -2258,6 +2267,13 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ", is_write=" << isWriteOperation); } + if (TabletState != NKikimrPQ::ENormal) { + SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()), + event.GetTxId(), + ctx); + return; + } + // // TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true // @@ -2274,9 +2290,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc // // FIXME(abcdef): последовательность вызовов Release // - ctx.Send(i->second.Actor, ev.Release()->Release().Release()); + ctx.Send(i->second.Actor, ev->Release().Release()); } else { - EvProposeTransactionQueue.emplace_back(ev.Release()->Release().Release()); + EvProposeTransactionQueue.emplace_back(ev->Release().Release()); TryWriteTxs(ctx); } @@ -2291,7 +2307,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont ", PlanStep: " << event.GetStep() << ", Mediator: " << event.GetMediatorID()); - EvPlanStepQueue.emplace_back(ev->Sender, ev.Release()->Release().Release()); + EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release()); TryWriteTxs(ctx); } @@ -2415,6 +2431,7 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp, } SendReplies(ctx); + TryReturnTabletStateAll(ctx); CheckChangedTxStates(ctx); WriteTxsInProgress = false; @@ -2889,6 +2906,24 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) ChangedTxs.clear(); } +bool TPersQueue::AllTransactionsHaveBeenProcessed() const +{ + return EvProposeTransactionQueue.empty() && Txs.empty(); +} + +void TPersQueue::SendProposeTransactionAbort(const TActorId& target, + ui64 txId, + const TActorContext& ctx) +{ + auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>(); + + event->Record.SetOrigin(TabletID()); + event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED); + event->Record.SetTxId(txId); + + ctx.Send(target, std::move(event)); +} + ui64 TPersQueue::GetAllowedStep() const { // diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 3abc39f8c6f..da27f90271d 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -134,7 +134,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { DESCRIBE_HANDLE_WITH_SENDER(HandleReserveBytesRequest) #undef DESCRIBE_HANDLE_WITH_SENDER bool ChangingState() const { return !TabletStateRequests.empty(); } - void ReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status = NKikimrProto::OK); + void TryReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status = NKikimrProto::OK); void ReturnTabletState(const TActorContext& ctx, const TChangeNotification& req, NKikimrProto::EReplyStatus status); void SchedulePlanStepAck(ui64 step, @@ -266,6 +266,16 @@ private: void SendReplies(const TActorContext& ctx); void CheckChangedTxStates(const TActorContext& ctx); + + bool AllTransactionsHaveBeenProcessed() const; + + void BeginWriteTabletState(const TActorContext& ctx, NKikimrPQ::ETabletState state); + void EndWriteTabletState(const NKikimrClient::TResponse& resp, + const TActorContext& ctx); + + void SendProposeTransactionAbort(const TActorId& target, + ui64 txId, + const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 21be59f1124..da7a4587f1f 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -57,6 +57,10 @@ struct TReadSetParams { ui64 SeqNo = 0; }; +struct TDropTabletParams { + ui64 TxId = 0; +}; + using NKikimr::NPQ::NHelpers::CreatePQTabletMock; using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock; @@ -110,13 +114,25 @@ protected: TMaybe<ui64> Consumer; }; + struct TDropTabletReplyMatcher { + TMaybe<NKikimrProto::EReplyStatus> Status; + TMaybe<ui64> TxId; + TMaybe<ui64> TabletId; + TMaybe<NKikimrPQ::ETabletState> State; + }; + using TProposeTransactionParams = NHelpers::TProposeTransactionParams; using TPlanStepParams = NHelpers::TPlanStepParams; using TReadSetParams = NHelpers::TReadSetParams; + using TDropTabletParams = NHelpers::TDropTabletParams; void SetUp(NUnitTest::TTestContext&) override; void TearDown(NUnitTest::TTestContext&) override; + void SendToPipe(const TActorId& sender, + IEventBase* event, + ui32 node = 0, ui64 cookie = 0); + void SendProposeTransactionRequest(const TProposeTransactionParams& params); void WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher = {}); @@ -130,6 +146,14 @@ protected: void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher); void SendReadSetAck(NHelpers::TPQTabletMock& tablet); + void SendDropTablet(const TDropTabletParams& params); + void WaitDropTabletReply(const TDropTabletReplyMatcher& matcher); + + void StartPQWriteStateObserver(); + void WaitForPQWriteState(); + + bool FoundPQWriteState = false; + // // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait // @@ -155,6 +179,28 @@ void TPQTabletFixture::SetUp(NUnitTest::TTestContext&) void TPQTabletFixture::TearDown(NUnitTest::TTestContext&) { + if (Pipe != TActorId()) { + Ctx->Runtime->ClosePipe(Pipe, Ctx->Edge, 0); + } +} + +void TPQTabletFixture::SendToPipe(const TActorId& sender, + IEventBase* event, + ui32 node, ui64 cookie) +{ + if (Pipe == TActorId()) { + Pipe = Ctx->Runtime->ConnectToPipe(Ctx->TabletId, + Ctx->Edge, + 0, + GetPipeConfigWithRetries()); + } + + Y_VERIFY(Pipe != TActorId()); + + Ctx->Runtime->SendToPipe(Pipe, + sender, + event, + node, cookie); } void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params) @@ -194,16 +240,8 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa // event->Record.SetTxId(params.TxId); - if (Pipe == TActorId()) { - Pipe = Ctx->Runtime->ConnectToPipe(Ctx->TabletId, - Ctx->Edge, - 0, - GetPipeConfigWithRetries()); - } - - Ctx->Runtime->SendToPipe(Pipe, - Ctx->Edge, - event.Release()); + SendToPipe(Ctx->Edge, + event.Release()); } void TPQTabletFixture::WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher) @@ -218,7 +256,7 @@ void TPQTabletFixture::WaitProposeTransactionResponse(const TProposeTransactionR if (matcher.Status) { UNIT_ASSERT(event->Record.HasStatus()); - UNIT_ASSERT(*matcher.Status == event->Record.GetStatus()); + UNIT_ASSERT_EQUAL(*matcher.Status, event->Record.GetStatus()); } } @@ -233,9 +271,8 @@ void TPQTabletFixture::SendPlanStep(const TPlanStepParams& params) ActorIdToProto(Ctx->Edge, tx->MutableAckTo()); } - Ctx->Runtime->SendToPipe(Pipe, - Ctx->Edge, - event.Release()); + SendToPipe(Ctx->Edge, + event.Release()); } void TPQTabletFixture::WaitPlanStepAck(const TPlanStepAckMatcher& matcher) @@ -297,7 +334,7 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS NKikimrTx::TReadSetData data; Y_VERIFY(data.ParseFromString(tablet.ReadSet->GetReadSet())); - UNIT_ASSERT(*matcher.Decision == data.GetDecision()); + UNIT_ASSERT_EQUAL(*matcher.Decision, data.GetDecision()); } if (matcher.Producer.Defined()) { UNIT_ASSERT(tablet.ReadSet->HasTabletProducer()); @@ -337,6 +374,66 @@ void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TRe } } +void TPQTabletFixture::SendDropTablet(const TDropTabletParams& params) +{ + auto event = MakeHolder<TEvPersQueue::TEvDropTablet>(); + event->Record.SetTxId(params.TxId); + event->Record.SetRequestedState(NKikimrPQ::EDropped); + + SendToPipe(Ctx->Edge, + event.Release()); +} + +void TPQTabletFixture::WaitDropTabletReply(const TDropTabletReplyMatcher& matcher) +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvDropTabletReply>(); + UNIT_ASSERT(event != nullptr); + + if (matcher.Status.Defined()) { + UNIT_ASSERT(event->Record.HasStatus()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus()); + } + if (matcher.TxId.Defined()) { + UNIT_ASSERT(event->Record.HasTxId()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->Record.GetTxId()); + } + if (matcher.TabletId.Defined()) { + UNIT_ASSERT(event->Record.HasTabletId()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.TabletId, event->Record.GetTabletId()); + } + if (matcher.State.Defined()) { + UNIT_ASSERT(event->Record.HasActualState()); + UNIT_ASSERT_EQUAL(*matcher.State, event->Record.GetActualState()); + } +} + +void TPQTabletFixture::StartPQWriteStateObserver() +{ + FoundPQWriteState = false; + + auto observer = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + if (auto* kvResponse = dynamic_cast<TEvKeyValue::TEvResponse*>(event->GetBase())) { + if ((event->Sender == event->Recipient) && + kvResponse->Record.HasCookie() && + (kvResponse->Record.GetCookie() == 4)) { // TPersQueue::WRITE_STATE_COOKIE + FoundPQWriteState = true; + } + } + + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + Ctx->Runtime->SetObserverFunc(observer); +} + +void TPQTabletFixture::WaitForPQWriteState() +{ + TDispatchOptions options; + options.CustomFinalCondition = [this]() { + return FoundPQWriteState; + }; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); +} + NHelpers::TPQTabletMock* TPQTabletFixture::CreatePQTabletMock(ui64 tabletId) { NHelpers::TPQTabletMock* mock = nullptr; @@ -524,6 +621,116 @@ Y_UNIT_TEST_F(Partition_Send_Predicate_With_False, TPQTabletFixture) // } +Y_UNIT_TEST_F(DropTablet_And_Tx, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=2}, {}, *Ctx); + + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + + StartPQWriteStateObserver(); + + SendProposeTransactionRequest({.TxId=txId_1, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + SendDropTablet({.TxId=12345}); + + // + // транзакция TxId_1 будет обработана + // + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + WaitForPQWriteState(); + + // + // по транзакции TxId_2 получим отказ + // + SendProposeTransactionRequest({.TxId=txId_2, + .TxOps={ + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED}); + + SendPlanStep({.Step=100, .TxIds={txId_1}}); + + WaitPlanStepAck({.Step=100, .TxIds={txId_1}}); // TEvPlanStepAck для координатора + SendDropTablet({.TxId=67890}); // TEvDropTable когда выполняется транзакция + WaitPlanStepAccepted({.Step=100}); + + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + + // + // ответы на TEvDropTablet будут после транзакции + // + WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=12345, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped}); + WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=67890, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped}); +} + +Y_UNIT_TEST_F(DropTablet, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=1}, {}, *Ctx); + + // + // транзакций нет, ответ будет сразу + // + SendDropTablet({.TxId=99999}); + WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=99999, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped}); +} + +Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=2}, {}, *Ctx); + + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + const ui64 txId_3 = 67892; + + StartPQWriteStateObserver(); + + // + // TEvDropTablet между транзакциями + // + SendProposeTransactionRequest({.TxId=txId_1, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + }}); + SendDropTablet({.TxId=12345}); + SendProposeTransactionRequest({.TxId=txId_2, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"} + }}); + + WaitProposeTransactionResponse({.TxId=txId_1, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + WaitForPQWriteState(); + + SendProposeTransactionRequest({.TxId=txId_3, + .TxOps={ + {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"} + }}); + + // + // транзакция пришла после того как состояние было записано на диск. не будет обработана + // + WaitProposeTransactionResponse({.TxId=txId_3, + .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED}); + + // + // транзакция пришла до того как состояние было записано на диск. будет обработана + // + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); +} + } } |