aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-26 18:50:02 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-04-26 18:50:02 +0300
commitb7f8a0ad36be9d570bf335114b9ba65f0c0085ec (patch)
treee2f1a323eb54c3a333fdecd23161f7515124b3a1
parentfceb8891abb608f9dc4b914b4643eb208db4e545 (diff)
downloadydb-b7f8a0ad36be9d570bf335114b9ba65f0c0085ec.tar.gz
mirrorer: recreate read session instead verify
ref:03639b46b270a2fa078f6a9ce35e766d54996038
-rw-r--r--ydb/core/persqueue/mirrorer.cpp51
-rw-r--r--ydb/core/persqueue/mirrorer.h3
2 files changed, 33 insertions, 21 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index bf447903b21..65b9299bc55 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -97,10 +97,14 @@ void TMirrorer::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
bool TMirrorer::AddToWriteRequest(
NKikimrClient::TPersQueuePartitionRequest& request,
- TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message
+ TPersQueueReadEvent::TDataReceivedEvent::TCompressedMessage& message,
+ bool& incorrectRequest
) {
if (!request.HasCmdWriteOffset()) {
- Y_VERIFY(request.CmdWriteSize() == 0);
+ if (request.CmdWriteSize() > 0) {
+ incorrectRequest = true;
+ return false;
+ }
request.SetCmdWriteOffset(message.GetOffset(0));
}
if (request.GetCmdWriteOffset() + request.CmdWriteSize() != message.GetOffset(0)) {
@@ -327,10 +331,17 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
req->SetMessageNo(0);
req->SetCookie(WRITE_REQUEST_COOKIE);
- while (!Queue.empty() && AddToWriteRequest(*req, Queue.front())) {
+ bool incorrectRequest = false;
+ while (!Queue.empty() && AddToWriteRequest(*req, Queue.front(), incorrectRequest)) {
WriteInFlight.emplace_back(std::move(Queue.front()));
Queue.pop_front();
}
+ if (incorrectRequest) {
+ ProcessError(ctx, TStringBuilder() << " incorrect filling of CmdWrite request: "
+ << req->DebugString());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
WriteRequestInFlight = request->Record.GetPartitionRequest();
@@ -538,27 +549,28 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) {
AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages()));
} else if (auto* createStream = std::get_if<TPersQueueReadEvent::TCreatePartitionStreamEvent>(&event.GetRef())) {
- Y_VERIFY_S(
- !PartitionStream,
- MirrorerDescription()
- << " already has stream " << PartitionStream->GetPartitionStreamId()
- << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId()
- );
+ LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
+ MirrorerDescription() << " got create stream event for '" << createStream->DebugString()
+ << " and will set offset=" << OffsetToRead);
+ if (PartitionStream) {
+ ProcessError(ctx, TStringBuilder() << " already has stream " << PartitionStream->GetPartitionStreamId()
+ << ", new stream " << createStream->GetPartitionStream()->GetPartitionStreamId());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
+
PartitionStream = createStream->GetPartitionStream();
- Y_VERIFY_S(
- Partition == PartitionStream->GetPartitionId(),
- MirrorerDescription()
- << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath()
- << " partition=" << PartitionStream->GetPartitionId()
- );
+ if (Partition != PartitionStream->GetPartitionId()) {
+ ProcessError(ctx, TStringBuilder() << " got stream for incorrect partition, stream: topic=" << PartitionStream->GetTopicPath()
+ << " partition=" << PartitionStream->GetPartitionId());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
if (OffsetToRead < createStream->GetCommittedOffset()) {
ProcessError(ctx, TStringBuilder() << "stream has commit offset more then partition end offset,"
<< "gap will be created [" << OffsetToRead << ";" << createStream->GetCommittedOffset() << ")"
);
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
- MirrorerDescription() << " got create stream event for '" << createStream->DebugString()
- << " and will set offset=" << OffsetToRead);
createStream->Confirm(OffsetToRead, OffsetToRead);
RequestSourcePartitionStatus();
@@ -568,8 +580,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
MirrorerDescription()
- << " got destroy stream event for partition stream id: "
- << destroyStream->GetPartitionStream()->GetPartitionStreamId());
+ << " got destroy stream event: " << destroyStream->DebugString());
} else if (auto* streamClosed = std::get_if<TPersQueueReadEvent::TPartitionStreamClosedEvent>(&event.GetRef())) {
PartitionStream.Reset();
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER,
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index b980379e5af..ca9739f577f 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -94,7 +94,8 @@ private:
bool AddToWriteRequest(
NKikimrClient::TPersQueuePartitionRequest& request,
- NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message
+ NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage& message,
+ bool& incorrectRequest
);
void ProcessError(const TActorContext& ctx, const TString& msg);
void ProcessError(const TActorContext& ctx, const TString& msg, const NKikimrClient::TResponse& response);