diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-06-01 00:40:03 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-06-01 00:40:03 +0300 |
commit | 1dcad18167f81e88877cc2750114183f538b4f96 (patch) | |
tree | 8fa6fd6ce705c29a153948f3868080b46308d2b9 | |
parent | cafa13684d1e4a4465f856dba34acaa751d37e1f (diff) | |
download | ydb-1dcad18167f81e88877cc2750114183f538b4f96.tar.gz |
logbroker mirrorer: add read timeout
ref:8b52d920c6864b800abce99f8f010dcd0d49ff77
-rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.h | 2 |
2 files changed, 15 insertions, 1 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 6470e046538..e1dddfa1442 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -266,7 +266,9 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte << ", messages in write request " << WriteInFlight.size() << ", queue to write: " << Queue.size()); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() - << "[STATE] wait new reader event=" << WaitNextReaderEventInFlight << ", read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId); + << "[STATE] wait new reader event=" << WaitNextReaderEventInFlight + << ", last received event time=" << LastReadEventTime + << ", read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId); if (!ReadFeatures.empty()) { const auto& oldest = *ReadFeatures.begin(); const auto& info = oldest.second; @@ -284,6 +286,13 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte InitTimeoutCounter.Inc(1); } StartInit(ctx); + return; + } + if (ReadSession && LastReadEventTime != TInstant::Zero() && ctx.Now() - LastReadEventTime > RECEIVE_READ_EVENT_TIMEOUT) { + ProcessError(ctx, TStringBuilder() << "receive read event timeout, last event was at " << LastReadEventTime + << " (" << ctx.Now() - LastReadEventTime << "). Read session will be recreated."); + ScheduleConsumerCreation(ctx); + return; } if (WriteRequestInFlight && WriteRequestTimestamp + WRITE_TIMEOUT < ctx.Now()) { LOG_ERROR_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " write request was sent at " @@ -292,6 +301,7 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte WriteTimeoutCounter.Inc(1); } ctx.Send(TabletActor, new TEvents::TEvPoisonPill()); + return; } DoProcessNextReaderEvent(ctx, true); // LOGBROKER-7430 @@ -469,6 +479,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) { ReadFuturesInFlight = 0; ReadFeatures.clear(); WaitNextReaderEventInFlight = false; + LastReadEventTime = TInstant::Zero(); Become(&TThis::StateInitConsumer); @@ -546,6 +557,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup) StartWaitNextReaderEvent(ctx); return; } + LastReadEventTime = ctx.Now(); if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) { AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages())); diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index ca9739f577f..8dabe460e49 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -37,6 +37,7 @@ private: const TDuration LOG_STATE_INTERVAL = TDuration::Minutes(1); const TDuration INIT_TIMEOUT = TDuration::Minutes(1); + const TDuration RECEIVE_READ_EVENT_TIMEOUT = TDuration::Minutes(1); const TDuration WRITE_TIMEOUT = TDuration::Minutes(10); @@ -189,6 +190,7 @@ private: TMap<ui64, std::pair<TInstant, NThreading::TFuture<void>>> ReadFeatures; ui64 ReadFeatureId = 0; ui64 ReadFuturesInFlight = 0; + TInstant LastReadEventTime; }; }// NPQ |