aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-06-01 00:40:03 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-06-01 00:40:03 +0300
commit1dcad18167f81e88877cc2750114183f538b4f96 (patch)
tree8fa6fd6ce705c29a153948f3868080b46308d2b9
parentcafa13684d1e4a4465f856dba34acaa751d37e1f (diff)
downloadydb-1dcad18167f81e88877cc2750114183f538b4f96.tar.gz
logbroker mirrorer: add read timeout
ref:8b52d920c6864b800abce99f8f010dcd0d49ff77
-rw-r--r--ydb/core/persqueue/mirrorer.cpp14
-rw-r--r--ydb/core/persqueue/mirrorer.h2
2 files changed, 15 insertions, 1 deletions
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 6470e046538..e1dddfa1442 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -266,7 +266,9 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
<< ", messages in write request " << WriteInFlight.size()
<< ", queue to write: " << Queue.size());
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
- << "[STATE] wait new reader event=" << WaitNextReaderEventInFlight << ", read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId);
+ << "[STATE] wait new reader event=" << WaitNextReaderEventInFlight
+ << ", last received event time=" << LastReadEventTime
+ << ", read futures inflight " << ReadFuturesInFlight << ", last id=" << ReadFeatureId);
if (!ReadFeatures.empty()) {
const auto& oldest = *ReadFeatures.begin();
const auto& info = oldest.second;
@@ -284,6 +286,13 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
InitTimeoutCounter.Inc(1);
}
StartInit(ctx);
+ return;
+ }
+ if (ReadSession && LastReadEventTime != TInstant::Zero() && ctx.Now() - LastReadEventTime > RECEIVE_READ_EVENT_TIMEOUT) {
+ ProcessError(ctx, TStringBuilder() << "receive read event timeout, last event was at " << LastReadEventTime
+ << " (" << ctx.Now() - LastReadEventTime << "). Read session will be recreated.");
+ ScheduleConsumerCreation(ctx);
+ return;
}
if (WriteRequestInFlight && WriteRequestTimestamp + WRITE_TIMEOUT < ctx.Now()) {
LOG_ERROR_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " write request was sent at "
@@ -292,6 +301,7 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
WriteTimeoutCounter.Inc(1);
}
ctx.Send(TabletActor, new TEvents::TEvPoisonPill());
+ return;
}
DoProcessNextReaderEvent(ctx, true); // LOGBROKER-7430
@@ -469,6 +479,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
ReadFuturesInFlight = 0;
ReadFeatures.clear();
WaitNextReaderEventInFlight = false;
+ LastReadEventTime = TInstant::Zero();
Become(&TThis::StateInitConsumer);
@@ -546,6 +557,7 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
StartWaitNextReaderEvent(ctx);
return;
}
+ LastReadEventTime = ctx.Now();
if (auto* dataEvent = std::get_if<TPersQueueReadEvent::TDataReceivedEvent>(&event.GetRef())) {
AddMessagesToQueue(std::move(dataEvent->GetCompressedMessages()));
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index ca9739f577f..8dabe460e49 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -37,6 +37,7 @@ private:
const TDuration LOG_STATE_INTERVAL = TDuration::Minutes(1);
const TDuration INIT_TIMEOUT = TDuration::Minutes(1);
+ const TDuration RECEIVE_READ_EVENT_TIMEOUT = TDuration::Minutes(1);
const TDuration WRITE_TIMEOUT = TDuration::Minutes(10);
@@ -189,6 +190,7 @@ private:
TMap<ui64, std::pair<TInstant, NThreading::TFuture<void>>> ReadFeatures;
ui64 ReadFeatureId = 0;
ui64 ReadFuturesInFlight = 0;
+ TInstant LastReadEventTime;
};
}// NPQ