aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-09-21 11:09:33 +0300
committeralexnick <alexnick@ydb.tech>2022-09-21 11:09:33 +0300
commit2cf1d02878d0043c5c97d8f890efeb1c83da4a4c (patch)
treeb8144ec73aa2c02945e69ec13886b5d8a2ee7731
parent510ee3d03d00a5081280a750f14a220c18a91807 (diff)
downloadydb-2cf1d02878d0043c5c97d8f890efeb1c83da4a4c.tar.gz
recreate session on stream closed event- SDK may stuck
-rw-r--r--ydb/core/persqueue/mirrorer.cpp9
1 files changed, 7 insertions, 2 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index e7cb23ce46b..047e801640b 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -452,7 +452,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
factory->GetSharedActorSystem(),
NKikimrServices::PQ_MIRRORER
));
-
+
TString logPrefix = TStringBuilder() << MirrorerDescription() << "[reader " << ++ReaderGeneration << "] ";
log.SetFormatter([logPrefix](ELogPriority, TStringBuf message) -> TString {
return logPrefix + message;
@@ -520,7 +520,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
ReadFeatures.clear();
WaitNextReaderEventInFlight = false;
LastReadEventTime = TInstant::Zero();
-
+
Become(&TThis::StateInitConsumer);
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " schedule consumer creation");
@@ -633,6 +633,11 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
<< " got stream closed event for partition stream id: "
<< streamClosed->GetPartitionStream()->GetPartitionStreamId()
<< " reason: " << streamClosed->GetReason());
+
+ ProcessError(ctx, TStringBuilder() << " read session stream closed event");
+ ScheduleConsumerCreation(ctx);
+ return;
+
} else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionStreamStatusEvent >(&event.GetRef())) {
if (PartitionStream
&& PartitionStream->GetPartitionStreamId() == streamStatus->GetPartitionStream()->GetPartitionStreamId()