aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-02-09 12:43:00 +0300
committerabcdef <akotov@ydb.tech>2023-02-09 12:43:00 +0300
commitf90cda0ca455e4ba0c4818fbb553453a111eb51d (patch)
treeb029804325756ba1e93abd734450769f75157716
parent24689527cd888aa8a640ecb5077e656b3520d373 (diff)
downloadydb-f90cda0ca455e4ba0c4818fbb553453a111eb51d.tar.gz
-rw-r--r--ydb/core/persqueue/pq_impl.cpp99
-rw-r--r--ydb/core/persqueue/pq_impl.h12
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp237
3 files changed, 300 insertions, 48 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index c2089e97297..58d35a74199 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -901,14 +901,36 @@ void TPersQueue::ReturnTabletState(const TActorContext& ctx, const TChangeNotifi
ctx.Send(req.Actor, event.Release());
}
-void TPersQueue::ReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status)
+void TPersQueue::TryReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status)
{
- for (auto req : TabletStateRequests)
- ReturnTabletState(ctx, req, status);
- TabletStateRequests.clear();
+ if (AllTransactionsHaveBeenProcessed() && (TabletState == NKikimrPQ::EDropped)) {
+ for (auto req : TabletStateRequests) {
+ ReturnTabletState(ctx, req, status);
+ }
+ TabletStateRequests.clear();
+ }
+}
+
+void TPersQueue::BeginWriteTabletState(const TActorContext& ctx, NKikimrPQ::ETabletState state)
+{
+ NKikimrPQ::TTabletState stateProto;
+ stateProto.SetState(state);
+ TString strState;
+ bool ok = stateProto.SerializeToString(&strState);
+ Y_VERIFY(ok);
+
+ TAutoPtr<TEvKeyValue::TEvRequest> kvRequest(new TEvKeyValue::TEvRequest);
+ kvRequest->Record.SetCookie(WRITE_STATE_COOKIE);
+
+ auto kvCmd = kvRequest->Record.AddCmdWrite();
+ kvCmd->SetKey(KeyState());
+ kvCmd->SetValue(strState);
+ kvCmd->SetTactic(AppData(ctx)->PQConfig.GetTactic());
+
+ ctx.Send(ctx.SelfID, kvRequest.Release());
}
-void TPersQueue::HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
+void TPersQueue::EndWriteTabletState(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
{
bool ok = (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) &&
(resp.WriteResultSize() == 1) &&
@@ -922,7 +944,8 @@ void TPersQueue::HandleStateWriteResponse(const NKikimrClient::TResponse& resp,
}
TabletState = NKikimrPQ::EDropped;
- ReturnTabletStateAll(ctx);
+
+ TryReturnTabletStateAll(ctx);
}
void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx)
@@ -938,7 +961,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
HandleConfigReadResponse(resp, ctx);
break;
case WRITE_STATE_COOKIE:
- HandleStateWriteResponse(resp, ctx);
+ EndWriteTabletState(resp, ctx);
break;
case WRITE_TX_COOKIE:
EndWriteTxs(resp, ctx);
@@ -1343,34 +1366,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorConte
TChangeNotification stateRequest(ev->Sender, txId);
NKikimrPQ::ETabletState reqState = record.GetRequestedState();
- if (reqState == TabletState) {
- ReturnTabletState(ctx, stateRequest, NKikimrProto::OK);
- return;
- } else if (reqState == NKikimrPQ::ENormal &&
- TabletState == NKikimrPQ::EDropped) {
+
+ if (reqState == NKikimrPQ::ENormal) {
ReturnTabletState(ctx, stateRequest, NKikimrProto::ERROR);
return;
}
- TabletStateRequests.insert(stateRequest);
- if (TabletStateRequests.size() > 1)
- return; // already sent, just enqueue
+ Y_VERIFY(reqState == NKikimrPQ::EDropped);
- NKikimrPQ::TTabletState stateProto;
- stateProto.SetState(record.GetRequestedState());
- TString strState;
- bool ok = stateProto.SerializeToString(&strState);
- Y_VERIFY(ok);
-
- TAutoPtr<TEvKeyValue::TEvRequest> kvRequest(new TEvKeyValue::TEvRequest);
- kvRequest->Record.SetCookie(WRITE_STATE_COOKIE);
-
- auto kvCmd = kvRequest->Record.AddCmdWrite();
- kvCmd->SetKey(KeyState());
- kvCmd->SetValue(strState);
- kvCmd->SetTactic(AppData(ctx)->PQConfig.GetTactic());
+ TabletStateRequests.insert(stateRequest);
+ if (TabletStateRequests.size() > 1) {
+ return; // already writing, just enqueue
+ }
- ctx.Send(ctx.SelfID, kvRequest.Release());
+ BeginWriteTabletState(ctx, reqState);
}
void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& ctx)
@@ -2258,6 +2267,13 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
", is_write=" << isWriteOperation);
}
+ if (TabletState != NKikimrPQ::ENormal) {
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()),
+ event.GetTxId(),
+ ctx);
+ return;
+ }
+
//
// TODO(abcdef): сохранить пока инициализируемся. TEvPersQueue::TEvHasDataInfo::TPtr как образец. не только конфиг. Inited==true
//
@@ -2274,9 +2290,9 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
//
// FIXME(abcdef): последовательность вызовов Release
//
- ctx.Send(i->second.Actor, ev.Release()->Release().Release());
+ ctx.Send(i->second.Actor, ev->Release().Release());
} else {
- EvProposeTransactionQueue.emplace_back(ev.Release()->Release().Release());
+ EvProposeTransactionQueue.emplace_back(ev->Release().Release());
TryWriteTxs(ctx);
}
@@ -2291,7 +2307,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont
", PlanStep: " << event.GetStep() <<
", Mediator: " << event.GetMediatorID());
- EvPlanStepQueue.emplace_back(ev->Sender, ev.Release()->Release().Release());
+ EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release());
TryWriteTxs(ctx);
}
@@ -2415,6 +2431,7 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
}
SendReplies(ctx);
+ TryReturnTabletStateAll(ctx);
CheckChangedTxStates(ctx);
WriteTxsInProgress = false;
@@ -2889,6 +2906,24 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
ChangedTxs.clear();
}
+bool TPersQueue::AllTransactionsHaveBeenProcessed() const
+{
+ return EvProposeTransactionQueue.empty() && Txs.empty();
+}
+
+void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
+ ui64 txId,
+ const TActorContext& ctx)
+{
+ auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
+
+ event->Record.SetOrigin(TabletID());
+ event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
+ event->Record.SetTxId(txId);
+
+ ctx.Send(target, std::move(event));
+}
+
ui64 TPersQueue::GetAllowedStep() const
{
//
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 3abc39f8c6f..da27f90271d 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -134,7 +134,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
DESCRIBE_HANDLE_WITH_SENDER(HandleReserveBytesRequest)
#undef DESCRIBE_HANDLE_WITH_SENDER
bool ChangingState() const { return !TabletStateRequests.empty(); }
- void ReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status = NKikimrProto::OK);
+ void TryReturnTabletStateAll(const TActorContext& ctx, NKikimrProto::EReplyStatus status = NKikimrProto::OK);
void ReturnTabletState(const TActorContext& ctx, const TChangeNotification& req, NKikimrProto::EReplyStatus status);
void SchedulePlanStepAck(ui64 step,
@@ -266,6 +266,16 @@ private:
void SendReplies(const TActorContext& ctx);
void CheckChangedTxStates(const TActorContext& ctx);
+
+ bool AllTransactionsHaveBeenProcessed() const;
+
+ void BeginWriteTabletState(const TActorContext& ctx, NKikimrPQ::ETabletState state);
+ void EndWriteTabletState(const NKikimrClient::TResponse& resp,
+ const TActorContext& ctx);
+
+ void SendProposeTransactionAbort(const TActorId& target,
+ ui64 txId,
+ const TActorContext& ctx);
};
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 21be59f1124..da7a4587f1f 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -57,6 +57,10 @@ struct TReadSetParams {
ui64 SeqNo = 0;
};
+struct TDropTabletParams {
+ ui64 TxId = 0;
+};
+
using NKikimr::NPQ::NHelpers::CreatePQTabletMock;
using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock;
@@ -110,13 +114,25 @@ protected:
TMaybe<ui64> Consumer;
};
+ struct TDropTabletReplyMatcher {
+ TMaybe<NKikimrProto::EReplyStatus> Status;
+ TMaybe<ui64> TxId;
+ TMaybe<ui64> TabletId;
+ TMaybe<NKikimrPQ::ETabletState> State;
+ };
+
using TProposeTransactionParams = NHelpers::TProposeTransactionParams;
using TPlanStepParams = NHelpers::TPlanStepParams;
using TReadSetParams = NHelpers::TReadSetParams;
+ using TDropTabletParams = NHelpers::TDropTabletParams;
void SetUp(NUnitTest::TTestContext&) override;
void TearDown(NUnitTest::TTestContext&) override;
+ void SendToPipe(const TActorId& sender,
+ IEventBase* event,
+ ui32 node = 0, ui64 cookie = 0);
+
void SendProposeTransactionRequest(const TProposeTransactionParams& params);
void WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher = {});
@@ -130,6 +146,14 @@ protected:
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
void SendReadSetAck(NHelpers::TPQTabletMock& tablet);
+ void SendDropTablet(const TDropTabletParams& params);
+ void WaitDropTabletReply(const TDropTabletReplyMatcher& matcher);
+
+ void StartPQWriteStateObserver();
+ void WaitForPQWriteState();
+
+ bool FoundPQWriteState = false;
+
//
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
//
@@ -155,6 +179,28 @@ void TPQTabletFixture::SetUp(NUnitTest::TTestContext&)
void TPQTabletFixture::TearDown(NUnitTest::TTestContext&)
{
+ if (Pipe != TActorId()) {
+ Ctx->Runtime->ClosePipe(Pipe, Ctx->Edge, 0);
+ }
+}
+
+void TPQTabletFixture::SendToPipe(const TActorId& sender,
+ IEventBase* event,
+ ui32 node, ui64 cookie)
+{
+ if (Pipe == TActorId()) {
+ Pipe = Ctx->Runtime->ConnectToPipe(Ctx->TabletId,
+ Ctx->Edge,
+ 0,
+ GetPipeConfigWithRetries());
+ }
+
+ Y_VERIFY(Pipe != TActorId());
+
+ Ctx->Runtime->SendToPipe(Pipe,
+ sender,
+ event,
+ node, cookie);
}
void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionParams& params)
@@ -194,16 +240,8 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
//
event->Record.SetTxId(params.TxId);
- if (Pipe == TActorId()) {
- Pipe = Ctx->Runtime->ConnectToPipe(Ctx->TabletId,
- Ctx->Edge,
- 0,
- GetPipeConfigWithRetries());
- }
-
- Ctx->Runtime->SendToPipe(Pipe,
- Ctx->Edge,
- event.Release());
+ SendToPipe(Ctx->Edge,
+ event.Release());
}
void TPQTabletFixture::WaitProposeTransactionResponse(const TProposeTransactionResponseMatcher& matcher)
@@ -218,7 +256,7 @@ void TPQTabletFixture::WaitProposeTransactionResponse(const TProposeTransactionR
if (matcher.Status) {
UNIT_ASSERT(event->Record.HasStatus());
- UNIT_ASSERT(*matcher.Status == event->Record.GetStatus());
+ UNIT_ASSERT_EQUAL(*matcher.Status, event->Record.GetStatus());
}
}
@@ -233,9 +271,8 @@ void TPQTabletFixture::SendPlanStep(const TPlanStepParams& params)
ActorIdToProto(Ctx->Edge, tx->MutableAckTo());
}
- Ctx->Runtime->SendToPipe(Pipe,
- Ctx->Edge,
- event.Release());
+ SendToPipe(Ctx->Edge,
+ event.Release());
}
void TPQTabletFixture::WaitPlanStepAck(const TPlanStepAckMatcher& matcher)
@@ -297,7 +334,7 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
NKikimrTx::TReadSetData data;
Y_VERIFY(data.ParseFromString(tablet.ReadSet->GetReadSet()));
- UNIT_ASSERT(*matcher.Decision == data.GetDecision());
+ UNIT_ASSERT_EQUAL(*matcher.Decision, data.GetDecision());
}
if (matcher.Producer.Defined()) {
UNIT_ASSERT(tablet.ReadSet->HasTabletProducer());
@@ -337,6 +374,66 @@ void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TRe
}
}
+void TPQTabletFixture::SendDropTablet(const TDropTabletParams& params)
+{
+ auto event = MakeHolder<TEvPersQueue::TEvDropTablet>();
+ event->Record.SetTxId(params.TxId);
+ event->Record.SetRequestedState(NKikimrPQ::EDropped);
+
+ SendToPipe(Ctx->Edge,
+ event.Release());
+}
+
+void TPQTabletFixture::WaitDropTabletReply(const TDropTabletReplyMatcher& matcher)
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPersQueue::TEvDropTabletReply>();
+ UNIT_ASSERT(event != nullptr);
+
+ if (matcher.Status.Defined()) {
+ UNIT_ASSERT(event->Record.HasStatus());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Status, event->Record.GetStatus());
+ }
+ if (matcher.TxId.Defined()) {
+ UNIT_ASSERT(event->Record.HasTxId());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, event->Record.GetTxId());
+ }
+ if (matcher.TabletId.Defined()) {
+ UNIT_ASSERT(event->Record.HasTabletId());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.TabletId, event->Record.GetTabletId());
+ }
+ if (matcher.State.Defined()) {
+ UNIT_ASSERT(event->Record.HasActualState());
+ UNIT_ASSERT_EQUAL(*matcher.State, event->Record.GetActualState());
+ }
+}
+
+void TPQTabletFixture::StartPQWriteStateObserver()
+{
+ FoundPQWriteState = false;
+
+ auto observer = [this](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ if (auto* kvResponse = dynamic_cast<TEvKeyValue::TEvResponse*>(event->GetBase())) {
+ if ((event->Sender == event->Recipient) &&
+ kvResponse->Record.HasCookie() &&
+ (kvResponse->Record.GetCookie() == 4)) { // TPersQueue::WRITE_STATE_COOKIE
+ FoundPQWriteState = true;
+ }
+ }
+
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+ Ctx->Runtime->SetObserverFunc(observer);
+}
+
+void TPQTabletFixture::WaitForPQWriteState()
+{
+ TDispatchOptions options;
+ options.CustomFinalCondition = [this]() {
+ return FoundPQWriteState;
+ };
+ UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
+}
+
NHelpers::TPQTabletMock* TPQTabletFixture::CreatePQTabletMock(ui64 tabletId)
{
NHelpers::TPQTabletMock* mock = nullptr;
@@ -524,6 +621,116 @@ Y_UNIT_TEST_F(Partition_Send_Predicate_With_False, TPQTabletFixture)
//
}
+Y_UNIT_TEST_F(DropTablet_And_Tx, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=2}, {}, *Ctx);
+
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+
+ StartPQWriteStateObserver();
+
+ SendProposeTransactionRequest({.TxId=txId_1,
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ SendDropTablet({.TxId=12345});
+
+ //
+ // транзакция TxId_1 будет обработана
+ //
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ WaitForPQWriteState();
+
+ //
+ // по транзакции TxId_2 получим отказ
+ //
+ SendProposeTransactionRequest({.TxId=txId_2,
+ .TxOps={
+ {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED});
+
+ SendPlanStep({.Step=100, .TxIds={txId_1}});
+
+ WaitPlanStepAck({.Step=100, .TxIds={txId_1}}); // TEvPlanStepAck для координатора
+ SendDropTablet({.TxId=67890}); // TEvDropTable когда выполняется транзакция
+ WaitPlanStepAccepted({.Step=100});
+
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+
+ //
+ // ответы на TEvDropTablet будут после транзакции
+ //
+ WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=12345, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped});
+ WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=67890, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped});
+}
+
+Y_UNIT_TEST_F(DropTablet, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=1}, {}, *Ctx);
+
+ //
+ // транзакций нет, ответ будет сразу
+ //
+ SendDropTablet({.TxId=99999});
+ WaitDropTabletReply({.Status=NKikimrProto::EReplyStatus::OK, .TxId=99999, .TabletId=Ctx->TabletId, .State=NKikimrPQ::EDropped});
+}
+
+Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=2}, {}, *Ctx);
+
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+ const ui64 txId_3 = 67892;
+
+ StartPQWriteStateObserver();
+
+ //
+ // TEvDropTablet между транзакциями
+ //
+ SendProposeTransactionRequest({.TxId=txId_1,
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ }});
+ SendDropTablet({.TxId=12345});
+ SendProposeTransactionRequest({.TxId=txId_2,
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}
+ }});
+
+ WaitProposeTransactionResponse({.TxId=txId_1,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ WaitForPQWriteState();
+
+ SendProposeTransactionRequest({.TxId=txId_3,
+ .TxOps={
+ {.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
+ {.Partition=1, .Consumer="user", .Begin=0, .End=0, .Path="/topic"}
+ }});
+
+ //
+ // транзакция пришла после того как состояние было записано на диск. не будет обработана
+ //
+ WaitProposeTransactionResponse({.TxId=txId_3,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::ABORTED});
+
+ //
+ // транзакция пришла до того как состояние было записано на диск. будет обработана
+ //
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+}
+
}
}