diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-26 18:50:02 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-04-26 18:50:02 +0300 |
commit | b7f8a0ad36be9d570bf335114b9ba65f0c0085ec (patch) | |
tree | e2f1a323eb54c3a333fdecd23161f7515124b3a1 | |
parent | fceb8891abb608f9dc4b914b4643eb208db4e545 (diff) | |
download | ydb-b7f8a0ad36be9d570bf335114b9ba65f0c0085ec.tar.gz |
mirrorer: recreate read session instead verify
ref:03639b46b270a2fa078f6a9ce35e766d54996038
-rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 51 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.h | 3 |
2 files changed, 33 insertions, 21 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index bf447903b21..65b9299bc55 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -97,10 +97,14 @@ void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) bool TMirrorer::AddToWriteRequest( NKikimrClient::TPersQueuePartitionRequest& request, - TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message + TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message, + bool& incorrectRequest ) { if (!request.HasCmdWriteOffset()) { - Y_VERIFY(request.CmdWriteSize() == 0); + if (request.CmdWriteSize() > 0) { + incorrectRequest = true; + return false; + } request.SetCmdWriteOffset(message.GetOffset(0)); } if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) { @@ -327,10 +331,17 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) { req->SetMessageNo(0); req->SetCookie(WRITE_REQUEST_COOKIE); - while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) { + bool incorrectRequest = false; + while (!Queue.empty() && AddToWriteRequest(*req, Queue.front(), incorrectRequest)) { WriteInFlight.emplace_back(std::move(Queue.front())); Queue.pop_front(); } + if (incorrectRequest) { + ProcessError(ctx, TStringBuilder() << " incorrect filling of CmdWrite request: " + << req->DebugString()); + ScheduleConsumerCreation(ctx); + return; + } WriteRequestInFlight = request->Record.GetPartitionRequest(); @@ -538,27 +549,28 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) { AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages())); } else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) { - Y_VERIFY_S( - !PartitionStream, - MirrorerDescription() - << " already has stream " << PartitionStream->GetPartitionStreamId() - << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId() - ); + LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, + MirrorerDescription() << " got create stream event for '" << createStream->DebugString() + << " and will set offset=" << OffsetToRead); + if (PartitionStream) { + ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId() + << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId()); + ScheduleConsumerCreation(ctx); + return; + } + PartitionStream = createStream->GetPartitionStream(); - Y_VERIFY_S( - Partition == PartitionStream->GetPartitionId(), - MirrorerDescription() - << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath() - << " partition=" << PartitionStream->GetPartitionId() - ); + if (Partition != PartitionStream->GetPartitionId()) { + ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath() + << " partition=" << PartitionStream->GetPartitionId()); + ScheduleConsumerCreation(ctx); + return; + } if (OffsetToRead < createStream->GetCommittedOffset()) { ProcessError(ctx, TStringBuilder() << "stream has commit offset more then partition end offset," << "gap will be created [" << OffsetToRead << ";" << createStream->GetCommittedOffset() << ")" ); } - LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, - MirrorerDescription() << " got create stream event for '" << createStream->DebugString() - << " and will set offset=" << OffsetToRead); createStream->Confirm(OffsetToRead, OffsetToRead); RequestSourcePartitionStatus(); @@ -568,8 +580,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() - << " got destroy stream event for partition stream id: " - << destroyStream->GetPartitionStream()->GetPartitionStreamId()); + << " got destroy stream event: " << destroyStream->DebugString()); } else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) { PartitionStream.Reset(); LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index b980379e5af..ca9739f577f 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -94,7 +94,8 @@ private: bool AddToWriteRequest( NKikimrClient::TPersQueuePartitionRequest& request, - NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message + NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message, + bool& incorrectRequest ); void ProcessError(const TActorContext& ctx, const TString& msg); void ProcessError(const TActorContext& ctx, const TString& msg, const NKikimrClient::TResponse& response); |