aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-02-17 12:19:49 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-02-17 12:19:49 +0300
commitdaf18fc9b115222e95f2e08b4fec7130c2eba979 (patch)
tree9cffecbc50f4cb75ef6ea5e59e1d99265380f6d1
parent2c8e314f8fff8633fe2cf026badfbf6180845ae0 (diff)
downloadydb-daf18fc9b115222e95f2e08b4fec7130c2eba979.tar.gz
pq read limiter: lazy counter initialization LOGBROKER-6913
ref:79ac5104e30d08e2d40cd363cba0a4f22a7df0dc
-rw-r--r--ydb/core/persqueue/read_speed_limiter.cpp19
-rw-r--r--ydb/core/persqueue/read_speed_limiter.h2
2 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/persqueue/read_speed_limiter.cpp b/ydb/core/persqueue/read_speed_limiter.cpp
index 793338420d7..a8b3dd18214 100644
--- a/ydb/core/persqueue/read_speed_limiter.cpp
+++ b/ydb/core/persqueue/read_speed_limiter.cpp
@@ -55,12 +55,11 @@ TReadSpeedLimiter::TReadSpeedLimiter(
LimiterDescription() <<" kesus=" << KesusPath << " resource_path=" << QuotaResourcePath);
}
-
-void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) {
- Become(&TThis::StateWork);
- ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
-
- auto counters = AppData()->Counters;
+void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) {
+ if (CountersInited) {
+ return;
+ }
+ auto counters = AppData(ctx)->Counters;
if (counters && TopicName.Contains("--")) {
QuotaWaitCounter.Reset(new TPercentileCounter(
GetServiceCounters(counters, "pqproxy|consumerReadQuotaWait"),
@@ -77,6 +76,13 @@ void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) {
true
));
}
+ CountersInited = true;
+}
+
+
+void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) {
+ Become(&TThis::StateWork);
+ ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
}
void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
@@ -101,6 +107,7 @@ void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvReque
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER,
LimiterDescription() << " quota required for cookie=" << ev->Get()->ReadRequest->Get()->Cookie
);
+ InitCounters(ctx);
bool hasActualErrors = ctx.Now() - LastReportedErrorTime <= DO_NOT_QUOTE_AFTER_ERROR_PERIOD;
if ((QuotaRequestInFlight || !InProcessReadRequestCookies.empty()) && !hasActualErrors) {
Queue.emplace_back(std::move(ev->Get()->ReadRequest), ctx.Now());
diff --git a/ydb/core/persqueue/read_speed_limiter.h b/ydb/core/persqueue/read_speed_limiter.h
index 23ca8dad955..cf91f7277e7 100644
--- a/ydb/core/persqueue/read_speed_limiter.h
+++ b/ydb/core/persqueue/read_speed_limiter.h
@@ -95,6 +95,7 @@ public:
);
void Bootstrap(const TActorContext& ctx);
+ void InitCounters(const TActorContext& ctx);
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr& ev, const TActorContext& ctx);
void HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx);
@@ -130,6 +131,7 @@ private:
TTabletCountersBase Counters;
THolder<TPercentileCounter> QuotaWaitCounter;
+ bool CountersInited = false;
TInstant LastReportedErrorTime;
};