diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-02-17 12:19:49 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-02-17 12:19:49 +0300 |
commit | daf18fc9b115222e95f2e08b4fec7130c2eba979 (patch) | |
tree | 9cffecbc50f4cb75ef6ea5e59e1d99265380f6d1 | |
parent | 2c8e314f8fff8633fe2cf026badfbf6180845ae0 (diff) | |
download | ydb-daf18fc9b115222e95f2e08b4fec7130c2eba979.tar.gz |
pq read limiter: lazy counter initialization LOGBROKER-6913
ref:79ac5104e30d08e2d40cd363cba0a4f22a7df0dc
-rw-r--r-- | ydb/core/persqueue/read_speed_limiter.cpp | 19 | ||||
-rw-r--r-- | ydb/core/persqueue/read_speed_limiter.h | 2 |
2 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/persqueue/read_speed_limiter.cpp b/ydb/core/persqueue/read_speed_limiter.cpp index 793338420d7..a8b3dd18214 100644 --- a/ydb/core/persqueue/read_speed_limiter.cpp +++ b/ydb/core/persqueue/read_speed_limiter.cpp @@ -55,12 +55,11 @@ TReadSpeedLimiter::TReadSpeedLimiter( LimiterDescription() <<" kesus=" << KesusPath << " resource_path=" << QuotaResourcePath); } - -void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) { - Become(&TThis::StateWork); - ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); - - auto counters = AppData()->Counters; +void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) { + if (CountersInited) { + return; + } + auto counters = AppData(ctx)->Counters; if (counters && TopicName.Contains("--")) { QuotaWaitCounter.Reset(new TPercentileCounter( GetServiceCounters(counters, "pqproxy|consumerReadQuotaWait"), @@ -77,6 +76,13 @@ void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) { true )); } + CountersInited = true; +} + + +void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateWork); + ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); } void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { @@ -101,6 +107,7 @@ void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvReque LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << " quota required for cookie=" << ev->Get()->ReadRequest->Get()->Cookie ); + InitCounters(ctx); bool hasActualErrors = ctx.Now() - LastReportedErrorTime <= DO_NOT_QUOTE_AFTER_ERROR_PERIOD; if ((QuotaRequestInFlight || !InProcessReadRequestCookies.empty()) && !hasActualErrors) { Queue.emplace_back(std::move(ev->Get()->ReadRequest), ctx.Now()); diff --git a/ydb/core/persqueue/read_speed_limiter.h b/ydb/core/persqueue/read_speed_limiter.h index 23ca8dad955..cf91f7277e7 100644 --- a/ydb/core/persqueue/read_speed_limiter.h +++ b/ydb/core/persqueue/read_speed_limiter.h @@ -95,6 +95,7 @@ public: ); void Bootstrap(const TActorContext& ctx); + void InitCounters(const TActorContext& ctx); void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); void HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr& ev, const TActorContext& ctx); void HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx); @@ -130,6 +131,7 @@ private: TTabletCountersBase Counters; THolder<TPercentileCounter> QuotaWaitCounter; + bool CountersInited = false; TInstant LastReportedErrorTime; }; |