diff options
author | alexnick <alexnick@ydb.tech> | 2022-09-21 11:09:33 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-09-21 11:09:33 +0300 |
commit | 2cf1d02878d0043c5c97d8f890efeb1c83da4a4c (patch) | |
tree | b8144ec73aa2c02945e69ec13886b5d8a2ee7731 | |
parent | 510ee3d03d00a5081280a750f14a220c18a91807 (diff) | |
download | ydb-2cf1d02878d0043c5c97d8f890efeb1c83da4a4c.tar.gz |
recreate session on stream closed event- SDK may stuck
-rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index e7cb23ce46b..047e801640b 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -452,7 +452,7 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont factory->GetSharedActorSystem(), NKikimrServices::PQ_MIRRORER )); - + TString logPrefix = TStringBuilder() << MirrorerDescription() << "[reader " << ++ReaderGeneration << "] "; log.SetFormatter([logPrefix](ELogPriority, TStringBuf message) -> TString { return logPrefix + message; @@ -520,7 +520,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) { ReadFeatures.clear(); WaitNextReaderEventInFlight = false; LastReadEventTime = TInstant::Zero(); - + Become(&TThis::StateInitConsumer); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " schedule consumer creation"); @@ -633,6 +633,11 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) << " got stream closed event for partition stream id: " << streamClosed->GetPartitionStream()->GetPartitionStreamId() << " reason: " << streamClosed->GetReason()); + + ProcessError(ctx, TStringBuilder() << " read session stream closed event"); + ScheduleConsumerCreation(ctx); + return; + } else if (auto* streamStatus = std::get_if<TPersQueueReadEvent::TPartitionStreamStatusEvent >(&event.GetRef())) { if (PartitionStream && PartitionStream->GetPartitionStreamId() == streamStatus->GetPartitionStream()->GetPartitionStreamId() |