aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-07-18 10:04:41 +0300
committersavnik <savnik@yandex-team.com>2023-07-18 10:04:41 +0300
commit75d2969cb699ede22cb09d3a6adab8906fc0ff3c (patch)
tree3e2cff5eaf6298daf3e784acff31eef33147a2e2
parentccbe76e66014692c9945391d3321d5bd491d022b (diff)
downloadydb-75d2969cb699ede22cb09d3a6adab8906fc0ff3c.tar.gz
Add total partition read quota
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/persqueue/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/persqueue/account_read_quoter.cpp (renamed from ydb/core/persqueue/read_speed_limiter.cpp)36
-rw-r--r--ydb/core/persqueue/account_read_quoter.h (renamed from ydb/core/persqueue/read_speed_limiter.h)26
-rw-r--r--ydb/core/persqueue/events/internal.h54
-rw-r--r--ydb/core/persqueue/partition.cpp53
-rw-r--r--ydb/core/persqueue/partition.h26
-rw-r--r--ydb/core/persqueue/partition_init.cpp17
-rw-r--r--ydb/core/persqueue/partition_read.cpp79
-rw-r--r--ydb/core/persqueue/quota_tracker.cpp12
-rw-r--r--ydb/core/persqueue/quota_tracker.h2
-rw-r--r--ydb/core/persqueue/read_quoter.cpp224
-rw-r--r--ydb/core/persqueue/read_quoter.h150
-rw-r--r--ydb/core/persqueue/user_info.cpp49
-rw-r--r--ydb/core/persqueue/user_info.h54
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp30
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h6
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp66
-rw-r--r--ydb/core/persqueue/ut/resources/counters_labeled.json36
-rw-r--r--ydb/core/persqueue/ut/resources/counters_topics.html6
-rw-r--r--ydb/core/persqueue/ya.make3
-rw-r--r--ydb/core/protos/counters_pq.proto12
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/core/tablet/private/aggregated_counters.cpp4
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.cpp2
-rw-r--r--ydb/library/services/services.proto3
-rw-r--r--ydb/public/api/protos/ydb_topic.proto3
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp6
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp26
31 files changed, 688 insertions, 311 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
index 955cb5e244a..04e59e4d917 100644
--- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
@@ -64,7 +64,8 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 02ed0e22e2f..549b7a7193c 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -65,7 +65,8 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
index 02ed0e22e2f..549b7a7193c 100644
--- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
@@ -65,7 +65,8 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
index 955cb5e244a..04e59e4d917 100644
--- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
@@ -64,7 +64,8 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp
diff --git a/ydb/core/persqueue/read_speed_limiter.cpp b/ydb/core/persqueue/account_read_quoter.cpp
index 0707426fe1b..f4f26eb74b6 100644
--- a/ydb/core/persqueue/read_speed_limiter.cpp
+++ b/ydb/core/persqueue/account_read_quoter.cpp
@@ -1,4 +1,4 @@
-#include "read_speed_limiter.h"
+#include "account_read_quoter.h"
#include "event_helpers.h"
#include <ydb/core/base/appdata.h>
@@ -18,15 +18,15 @@ namespace NPQ {
const TDuration UPDATE_COUNTERS_INTERVAL = TDuration::Seconds(5);
const TDuration DO_NOT_QUOTE_AFTER_ERROR_PERIOD = TDuration::Seconds(5);
-const TString TReadSpeedLimiter::READ_QUOTA_ROOT_PATH = "read-quota";
+const TString TAccountReadQuoter::READ_QUOTA_ROOT_PATH = "read-quota";
-constexpr NKikimrServices::TActivity::EType TReadSpeedLimiter::ActorActivityType() {
- return NKikimrServices::TActivity::PERSQUEUE_READ_SPEED_LIMITER;
+constexpr NKikimrServices::TActivity::EType TAccountReadQuoter::ActorActivityType() {
+ return NKikimrServices::TActivity::PERSQUEUE_ACCOUNT_READ_QUOTER;
}
-TReadSpeedLimiter::TReadSpeedLimiter(
+TAccountReadQuoter::TAccountReadQuoter(
TActorId tabletActor,
- TActorId partitionActor,
+ TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
@@ -34,7 +34,7 @@ TReadSpeedLimiter::TReadSpeedLimiter(
const TTabletCountersBase& counters
)
: TabletActor(tabletActor)
- , PartitionActor(partitionActor)
+ , Recepient(recepient)
, TabletId(tabletId)
, TopicConverter(topicConverter)
, Partition(partition)
@@ -56,7 +56,7 @@ TReadSpeedLimiter::TReadSpeedLimiter(
LimiterDescription() <<" kesus=" << KesusPath << " resource_path=" << QuotaResourcePath);
}
-void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) {
+void TAccountReadQuoter::InitCounters(const TActorContext& ctx) {
if (CountersInited) {
return;
}
@@ -81,12 +81,12 @@ void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) {
}
-void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) {
+void TAccountReadQuoter::Bootstrap(const TActorContext& ctx) {
Become(&TThis::StateWork);
ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
}
-void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
+void TAccountReadQuoter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << " killed");
for (const auto& event : Queue) {
auto cookie = event.Event->Get()->Cookie;
@@ -99,12 +99,12 @@ void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContex
Die(ctx);
}
-void TReadSpeedLimiter::HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr&, const TActorContext& ctx) {
+void TAccountReadQuoter::HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr&, const TActorContext& ctx) {
ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters);
- ctx.Send(PartitionActor, new NReadSpeedLimiterEvents::TEvCounters(Counters, User));
+ ctx.Send(Recepient, new NAccountReadQuoterEvents::TEvCounters(Counters, User));
}
-void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx) {
+void TAccountReadQuoter::HandleReadQuotaRequest(NAccountReadQuoterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER,
LimiterDescription() << " quota required for cookie=" << ev->Get()->ReadRequest->Get()->Cookie
);
@@ -117,7 +117,7 @@ void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvReque
}
}
-void TReadSpeedLimiter::HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx) {
+void TAccountReadQuoter::HandleReadQuotaConsumed(NAccountReadQuoterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx) {
ConsumedBytesInCredit += ev->Get()->ReadBytes;
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription()
<< "consumed read quota " << ev->Get()->ReadBytes
@@ -147,7 +147,7 @@ void TReadSpeedLimiter::HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvCons
}
}
-void TReadSpeedLimiter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) {
+void TAccountReadQuoter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) {
QuotaRequestInFlight = false;
const ui64 cookie = ev->Cookie;
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER,
@@ -170,21 +170,21 @@ void TReadSpeedLimiter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const
}
}
-void TReadSpeedLimiter::TReadSpeedLimiter::ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx) {
+void TAccountReadQuoter::TAccountReadQuoter::ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER,
LimiterDescription() << " approve read for cookie=" << ev->Get()->Cookie
);
InProcessReadRequestCookies.insert(ev->Get()->Cookie);
auto waitTime = ctx.Now() - startWait;
- Send(PartitionActor, new NReadSpeedLimiterEvents::TEvResponse(ev.Release(), waitTime));
+ Send(Recepient, new NAccountReadQuoterEvents::TEvResponse(ev.Release(), waitTime));
if (QuotaWaitCounter) {
QuotaWaitCounter->IncFor(waitTime.MilliSeconds());
}
}
-TString TReadSpeedLimiter::LimiterDescription() const {
+TString TAccountReadQuoter::LimiterDescription() const {
return TStringBuilder() << "topic=" << TopicConverter->GetClientsideName() << ":" << Partition << " user=" << User << ": ";
}
diff --git a/ydb/core/persqueue/read_speed_limiter.h b/ydb/core/persqueue/account_read_quoter.h
index 624e7416a21..bfd223c5168 100644
--- a/ydb/core/persqueue/read_speed_limiter.h
+++ b/ydb/core/persqueue/account_read_quoter.h
@@ -11,8 +11,8 @@ namespace NPQ {
class TPercentileCounter;
-namespace NReadSpeedLimiterEvents {
- struct TEvRequest : public TEventLocal<TEvRequest, TEvPQ::EvReadLimiterRequest> {
+namespace NAccountReadQuoterEvents {
+ struct TEvRequest : public TEventLocal<TEvRequest, TEvPQ::EvAccountReadQuotaRequest> {
TEvRequest(TEvPQ::TEvRead::TPtr readRequest)
: ReadRequest(std::move(readRequest))
{}
@@ -20,7 +20,7 @@ namespace NReadSpeedLimiterEvents {
TEvPQ::TEvRead::TPtr ReadRequest;
};
- struct TEvResponse : public TEventLocal<TEvResponse, TEvPQ::EvReadLimiterResponse> {
+ struct TEvResponse : public TEventLocal<TEvResponse, TEvPQ::EvAccountReadQuotaResponse> {
TEvResponse(TEvPQ::TEvRead::TPtr readRequest, TDuration waitTime)
: ReadRequest(std::move(readRequest))
, WaitTime(waitTime)
@@ -30,7 +30,7 @@ namespace NReadSpeedLimiterEvents {
TDuration WaitTime;
};
- struct TEvConsumed : public TEventLocal<TEvConsumed, TEvPQ::EvReadLimiterConsumed> {
+ struct TEvConsumed : public TEventLocal<TEvConsumed, TEvPQ::EvAccountReadQuotaConsumed> {
TEvConsumed(ui64 readBytes, ui64 readRequestCookie)
: ReadBytes(readBytes)
, ReadRequestCookie(readRequestCookie)
@@ -40,7 +40,7 @@ namespace NReadSpeedLimiterEvents {
ui64 ReadRequestCookie;
};
- struct TEvCounters : public TEventLocal<TEvCounters, TEvPQ::EvReadLimiterCounters> {
+ struct TEvCounters : public TEventLocal<TEvCounters, TEvPQ::EvAccountReadQuotaCounters> {
TEvCounters(const NKikimr::TTabletCountersBase& counters, const TString& user)
: User(user)
{
@@ -52,7 +52,7 @@ namespace NReadSpeedLimiterEvents {
};
}
-class TReadSpeedLimiter : public TActorBootstrapped<TReadSpeedLimiter> {
+class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
private:
static const TString READ_QUOTA_ROOT_PATH;
@@ -72,8 +72,8 @@ private:
TRACE_EVENT(NKikimrServices::PQ_READ_SPEED_LIMITER);
switch (ev->GetTypeRewrite()) {
HFuncTraced(TEvPQ::TEvUpdateCounters, HandleUpdateCounters);
- HFuncTraced(NReadSpeedLimiterEvents::TEvRequest, HandleReadQuotaRequest);
- HFuncTraced(NReadSpeedLimiterEvents::TEvConsumed, HandleReadQuotaConsumed);
+ HFuncTraced(NAccountReadQuoterEvents::TEvRequest, HandleReadQuotaRequest);
+ HFuncTraced(NAccountReadQuoterEvents::TEvConsumed, HandleReadQuotaConsumed);
HFuncTraced(TEvQuota::TEvClearance, HandleClearance);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
default:
@@ -84,9 +84,9 @@ private:
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType();
- TReadSpeedLimiter(
+ TAccountReadQuoter(
TActorId tabletActor,
- TActorId partitionActor,
+ TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
@@ -98,8 +98,8 @@ public:
void InitCounters(const TActorContext& ctx);
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
void HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr& ev, const TActorContext& ctx);
- void HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx);
- void HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx);
+ void HandleReadQuotaRequest(NAccountReadQuoterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx);
+ void HandleReadQuotaConsumed(NAccountReadQuoterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx);
void HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
void ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx);
@@ -109,7 +109,7 @@ private:
private:
const TActorId TabletActor;
- const TActorId PartitionActor;
+ const TActorId Recepient;
const ui64 TabletId;
const NPersQueue::TTopicConverterPtr TopicConverter;
const ui32 Partition;
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 56d77cc2d43..7c5e381ab29 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -107,10 +107,10 @@ struct TEvPQ {
EvSplitMessageGroup,
EvUpdateCounters,
EvMirrorerCounters,
- EvReadLimiterRequest,
- EvReadLimiterResponse,
- EvReadLimiterConsumed,
- EvReadLimiterCounters,
+ EvAccountReadQuotaRequest,
+ EvAccountReadQuotaResponse,
+ EvAccountReadQuotaConsumed,
+ EvAccountReadQuotaCounters,
EvRetryWrite,
EvInitCredentials,
EvCredentialsCreated,
@@ -128,6 +128,12 @@ struct TEvPQ {
EvPartitionConfigChanged,
EvSubDomainStatus,
EvStatsWakeup,
+ EvRequestQuota,
+ EvApproveQuota,
+ EvConsumed,
+ EvQuotaUpdated,
+ EvQuotaCountersUpdated,
+ EvConsumerRemoved,
EvEnd
};
@@ -794,6 +800,46 @@ struct TEvPQ {
ui64 Round;
};
+
+ struct TEvRequestQuota : public TEventLocal<TEvRequestQuota, EvRequestQuota> {
+ TEvRequestQuota(TEvPQ::TEvRead::TPtr readRequest)
+ :
+ ReadRequest(std::move(readRequest))
+ {}
+
+ TEvPQ::TEvRead::TPtr ReadRequest;
+ };
+
+ struct TEvApproveQuota : public TEventLocal<TEvApproveQuota, EvApproveQuota> {
+ TEvApproveQuota(TEvPQ::TEvRead::TPtr readRequest, TDuration waitTime)
+ :
+ ReadRequest(std::move(readRequest)),
+ WaitTime(std::move(waitTime))
+ {}
+
+ TEvPQ::TEvRead::TPtr ReadRequest;
+ TDuration WaitTime;
+ };
+
+ struct TEvConsumed : public TEventLocal<TEvConsumed, EvConsumed> {
+ TEvConsumed(ui64 readBytes, ui64 readRequestCookie, const TString& consumer)
+ : ReadBytes(readBytes),
+ ReadRequestCookie(readRequestCookie),
+ Consumer(consumer)
+ {}
+
+ ui64 ReadBytes;
+ ui64 ReadRequestCookie;
+ TString Consumer;
+ };
+
+ struct TEvConsumerRemoved : public TEventLocal<TEvConsumerRemoved, EvConsumerRemoved> {
+ TEvConsumerRemoved(const TString& consumer)
+ : Consumer(consumer)
+ {}
+
+ TString Consumer;
+ };
};
} //NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index faebc0aff8d..facb975b521 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -139,6 +139,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co
, DiskIsFull(false)
, SubDomainOutOfSpace(subDomainOutOfSpace)
, HasDataReqNum(0)
+ , AvgReadBytes(TDuration::Minutes(1), 1000)
, AvgWriteBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
, AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}}
, ReservedSize(0)
@@ -194,7 +195,6 @@ ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const {
return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
}
-
ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
const auto now = ctx.Now();
const auto duration = now - LastUsedStorageMeterTimestamp;
@@ -408,6 +408,8 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
UsersInfoStorage->Clear(ctx);
}
+ Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
+
Die(ctx);
}
@@ -572,7 +574,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
for (ui32 i = 0; i < 4; ++i) {
resSpeed[i] += userInfo.AvgReadBytes[i].GetValue();
}
- maxQuota += userInfo.ReadQuota.GetTotalSpeed();
+ maxQuota += userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get();
}
if (ev->Get()->ClientId == userInfo.User) { //fill lags
NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo();
@@ -895,7 +897,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
}
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- OnReadRequestFinished(std::move(info), answer.Size);
+ OnReadRequestFinished(cookie, answer.Size, info.User, ctx);
}
void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {
@@ -1056,12 +1058,6 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) {
userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Set(1);
}
- ui64 speed = userInfo.ReadQuota.GetTotalSpeed();
- if (speed != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Get()) {
- haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Set(speed);
- }
-
ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum;
if (readOffsetRewindSum != userInfo.LabeledCounters->GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) {
haveChanges = true;
@@ -1079,17 +1075,18 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) {
id += 2;
}
Y_VERIFY(id == METRIC_MAX_READ_SPEED_4 + 1);
- if (userInfo.ReadQuota.GetTotalSpeed()) {
- ui64 quotaUsage = ui64(userInfo.AvgReadBytes[1].GetValue()) * 1000000 / userInfo.ReadQuota.GetTotalSpeed() / 60;
- if (quotaUsage != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_USAGE].Get()) {
+ if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get()) {
+ ui64 quotaUsage = ui64(userInfo.AvgReadBytes[1].GetValue()) * 1000000 / userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get() / 60;
+ if (quotaUsage != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get()) {
haveChanges = true;
- userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_USAGE].Set(quotaUsage);
+ userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
}
}
if (haveChanges) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
}
}
+
bool haveChanges = false;
if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionCountersLabeled->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) {
haveChanges = true;
@@ -1189,6 +1186,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) {
haveChanges = true;
PartitionCountersLabeled->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag);
}
+
+ if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get()) {
+ ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
+ if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
+ haveChanges = true;
+ PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
+ }
+ }
return haveChanges;
}
@@ -1198,6 +1203,16 @@ void TPartition::ReportCounters(const TActorContext& ctx) {
}
}
+void TPartition::Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext&) {
+ for (auto& [consumerStr, quota] : ev->Get()->UpdatedConsumerQuotas) {
+ TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr);
+ if (userInfo) {
+ userInfo->LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Set(quota);
+ }
+ }
+ PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Set(ev->Get()->UpdatedTotalPartitionQuota);
+}
+
void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
auto& response = ev->Get()->Record;
@@ -1599,17 +1614,9 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
UsersInfoStorage->UpdateConfig(Config);
- WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
- if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
- for (auto& userInfo : UsersInfoStorage->GetAll()) {
- userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2);
- }
- }
+ WriteQuota->UpdateConfigIfChanged(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
- for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto& userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
- userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
- }
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
if (Config.GetPartitionConfig().HasMirrorFrom()) {
if (Mirrorer) {
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 5784df51e90..612bb8631c3 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -10,6 +10,7 @@
#include "subscriber.h"
#include "user_info.h"
#include "utils.h"
+#include "read_quoter.h"
#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
@@ -108,8 +109,9 @@ private:
void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
void FilterDeadlinedWrites(const TActorContext& ctx);
- void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx);
- void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(NReadQuoterEvents::TEvQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
+ void Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvApproveQuota::TPtr& ev, const TActorContext& ctx);
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx);
@@ -162,7 +164,7 @@ private:
void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx);
void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx);
- void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize);
+ void OnReadRequestFinished(ui64 cookie, ui64 answerSize, const TString& consumer, const TActorContext& ctx);
void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
void ProcessChangeOwnerRequests(const TActorContext& ctx);
@@ -386,13 +388,14 @@ private:
HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
- HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle);
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit);
HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit);
HFuncTraced(TEvPQ::TEvTxCommit, HandleOnInit);
HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
+ HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
+ HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
default:
if (!Initializer.Handle(ev)) {
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -414,7 +417,7 @@ private:
HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
HFuncTraced(TEvPQ::TEvWrite, HandleOnIdle);
HFuncTraced(TEvPQ::TEvRead, Handle);
- HFuncTraced(NReadSpeedLimiterEvents::TEvResponse, Handle);
+ HFuncTraced(TEvPQ::TEvApproveQuota, Handle);
HFuncTraced(TEvPQ::TEvReadTimeout, Handle);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
@@ -429,7 +432,6 @@ private:
HFuncTraced(TEvPQ::TEvChangeOwner, Handle);
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
- HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle);
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
HFuncTraced(TEvPQ::TEvError, Handle);
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
@@ -447,7 +449,8 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
-
+ HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
+ HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
break;
@@ -468,7 +471,7 @@ private:
HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
HFuncTraced(TEvPQ::TEvWrite, HandleOnWrite);
HFuncTraced(TEvPQ::TEvRead, Handle);
- HFuncTraced(NReadSpeedLimiterEvents::TEvResponse, Handle);
+ HFuncTraced(TEvPQ::TEvApproveQuota, Handle);
HFuncTraced(TEvPQ::TEvReadTimeout, Handle);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
@@ -483,7 +486,6 @@ private:
HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
- HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle);
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
HFuncTraced(TEvPQ::TEvError, Handle);
HFuncTraced(TEvPQ::TEvReserveBytes, Handle);
@@ -501,7 +503,8 @@ private:
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
-
+ HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
+ HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev));
break;
@@ -626,7 +629,10 @@ private:
TSet<THasDataDeadline> HasDataDeadlines;
ui64 HasDataReqNum;
+ NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>> AvgReadBytes;
+
TMaybe<TQuotaTracker> WriteQuota;
+ TActorId ReadQuotaTrackerActor;
THolder<TPercentileCounter> PartitionWriteQuotaWaitCounter;
TInstant QuotaDeadline = TInstant::Zero();
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index d47b026edad..56cdd629e37 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -663,6 +663,16 @@ void TPartition::Initialize(const TActorContext& ctx) {
WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(),
Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(),
ctx.Now());
+ ReadQuotaTrackerActor = Register(new TReadQuoter(
+ ctx,
+ SelfId(),
+ TopicConverter,
+ Config,
+ Partition,
+ Tablet,
+ TabletID,
+ Counters
+ ));
WriteTimestamp = ctx.Now();
LastUsedStorageMeterTimestamp = ctx.Now();
WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero();
@@ -681,10 +691,8 @@ void TPartition::Initialize(const TActorContext& ctx) {
TopicWriteQuotaResourcePath);
UsersInfoStorage.ConstructInPlace(DCId,
- TabletID,
TopicConverter,
Partition,
- Counters,
Config,
CloudId,
DbId,
@@ -730,11 +738,6 @@ void TPartition::Initialize(const TActorContext& ctx) {
DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i]));
}
- for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx);
- userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
- }
-
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID);
if (AppData(ctx)->Counters) {
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 6b7e14c0327..7a30acc2ec9 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -128,20 +128,6 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) {
auto now = ctx.Now();
WriteQuota->Update(now);
- for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
- while (true) {
- if (!userInfo.ReadQuota.CanExaust(now) && !userInfo.ReadRequests.empty()) {
- break;
- }
- if (!userInfo.ReadRequests.empty()) {
- auto ri(std::move(userInfo.ReadRequests.front().first));
- auto cookie = userInfo.ReadRequests.front().second;
- userInfo.ReadRequests.pop_front();
- ProcessRead(ctx, std::move(ri), cookie, false);
- } else
- break;
- }
- }
ScheduleUpdateAvailableSize(ctx);
}
@@ -176,13 +162,8 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
}
}
-void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& /*ctx*/) {
- auto userInfo = UsersInfoStorage->GetIfExists(ev->Get()->User);
- if (userInfo && userInfo->ReadSpeedLimiter) {
- auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline);
- TabletCounters.Populate(*diff.Get());
- ev->Get()->Counters.RememberCurrentStateAsBaseline(userInfo->ReadSpeedLimiter->Baseline);
- }
+void TPartition::Handle(NReadQuoterEvents::TEvQuotaCountersUpdated::TPtr& ev, const TActorContext& /*ctx*/) {
+ TabletCounters.Populate(*ev->Get()->Counters.Get());
}
void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
@@ -513,7 +494,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
auto& userInfo = UsersInfoStorage->GetOrCreate(res->User, ctx);
userInfo.ForgetSubscription(ctx.Now());
- OnReadRequestFinished(std::move(res.GetRef()), answer.Size);
+ OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx);
}
@@ -678,30 +659,31 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) {
return;
}
}
-
- if (userInfo.ReadSpeedLimiter) {
- Send(userInfo.ReadSpeedLimiter->Actor, new NReadSpeedLimiterEvents::TEvRequest(ev.Release()));
- } else {
- DoRead(ev.Release(), TDuration::Zero(), ctx);
- }
+ userInfo.ReadsInQuotaQueue++;
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvRequestQuota(ev.Release()));
}
-void TPartition::Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+void TPartition::Handle(TEvPQ::TEvApproveQuota::TPtr& ev, const TActorContext& ctx) {
DoRead(ev->Get()->ReadRequest.Release(), ev->Get()->WaitTime, ctx);
}
void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx) {
auto read = ev->Get();
const TString& user = read->ClientId;
- auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx);
-
+ auto userInfo = UsersInfoStorage->GetIfExists(user);
+ if(!userInfo) {
+ ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition");
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
+ return;
+ }
+ userInfo->ReadsInQuotaQueue--;
ui64 offset = read->Offset;
- if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) {
+ if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
- timestamp = Max(timestamp, userInfo.ReadFromTimestamp);
+ timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
- userInfo.ReadOffsetRewindSum += offset - read->Offset;
+ userInfo->ReadOffsetRewindSum += offset - read->Offset;
}
TReadInfo info(user, read->ClientDC, offset, read->PartNo, read->Count, read->Size, read->Cookie, read->ReadTimestampMs, waitQuotaTime);
@@ -729,8 +711,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
read->Timeout = 30000;
}
Subscriber.AddSubscription(std::move(info), read->Timeout, cookie, ctx);
- ++userInfo.Subscriptions;
- userInfo.UpdateReadOffset((i64)offset - 1, userInfo.WriteTimestamp, userInfo.CreateTimestamp, ctx.Now());
+ ++userInfo->Subscriptions;
+ userInfo->UpdateReadOffset((i64)offset - 1, userInfo->WriteTimestamp, userInfo->CreateTimestamp, ctx.Now());
return;
}
@@ -740,20 +722,9 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const
ProcessRead(ctx, std::move(info), cookie, false);
}
-void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
- auto userInfo = UsersInfoStorage->GetIfExists(info.User);
- Y_VERIFY(userInfo);
-
- if (Config.GetMeteringMode() != NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY) {
- return;
- }
-
- if (userInfo->ReadSpeedLimiter) {
- Send(
- userInfo->ReadSpeedLimiter->Actor,
- new NReadSpeedLimiterEvents::TEvConsumed(answerSize, info.Destination)
- );
- }
+void TPartition::OnReadRequestFinished(ui64 cookie, ui64 answerSize, const TString& consumer, const TActorContext& ctx) {
+ AvgReadBytes.Update(answerSize, ctx.Now());
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumed(answerSize, cookie, consumer));
}
void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) {
@@ -954,6 +925,7 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
}
UsersInfoStorage->Remove(user, ctx);
+ Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user));
}
}
@@ -989,11 +961,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
userInfo.ForgetSubscription(ctx.Now());
}
- if (!userInfo.ReadQuota.CanExaust(ctx.Now())) {
- userInfo.ReadRequests.push_back({std::move(info), cookie});
- userInfo.UpdateReadingTimeAndState(ctx.Now());
- return;
- }
TVector<TRequestedBlob> blobs = GetReadRequestFromBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size);
info.Blobs = blobs;
ui64 lastOffset = info.Offset + Min(count, info.Count);
@@ -1025,7 +992,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize());
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
- OnReadRequestFinished(std::move(info), answer.Size);
+ OnReadRequestFinished(cookie, answer.Size, info.User, ctx);
return;
}
diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp
index 9d8d0722cf0..11dd188424a 100644
--- a/ydb/core/persqueue/quota_tracker.cpp
+++ b/ydb/core/persqueue/quota_tracker.cpp
@@ -9,10 +9,14 @@ namespace NKikimr::NPQ {
, MaxBurst(maxBurst)
{}
- void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
- SpeedPerSecond = speedPerSecond;
- MaxBurst = maxBurst;
- AvailableSize = maxBurst;
+ bool TQuotaTracker::UpdateConfigIfChanged(const ui64 maxBurst, const ui64 speedPerSecond) {
+ if (maxBurst != MaxBurst || speedPerSecond != SpeedPerSecond) {
+ SpeedPerSecond = speedPerSecond;
+ MaxBurst = maxBurst;
+ AvailableSize = maxBurst;
+ return true;
+ }
+ return false;
}
void TQuotaTracker::Update(const TInstant timestamp) {
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h
index 82dcfba9ecc..f5a004b2ccc 100644
--- a/ydb/core/persqueue/quota_tracker.h
+++ b/ydb/core/persqueue/quota_tracker.h
@@ -8,7 +8,7 @@ namespace NKikimr::NPQ {
public:
TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp);
- void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond);
+ bool UpdateConfigIfChanged(const ui64 maxBurst, const ui64 speedPerSecond);
void Update(const TInstant timestamp);
bool CanExaust(const TInstant timestamp) ;
diff --git a/ydb/core/persqueue/read_quoter.cpp b/ydb/core/persqueue/read_quoter.cpp
new file mode 100644
index 00000000000..2ec3c939759
--- /dev/null
+++ b/ydb/core/persqueue/read_quoter.cpp
@@ -0,0 +1,224 @@
+#include "read_quoter.h"
+#include "account_read_quoter.h"
+
+
+namespace NKikimr {
+namespace NPQ {
+
+const TDuration WAKE_UP_TIMEOUT = TDuration::Seconds(5);
+const ui64 DEFAULT_READ_SPEED_AND_BURST = 1'000'000'000;
+
+void TReadQuoter::Bootstrap(const TActorContext &ctx) {
+ Become(&TThis::StateWork);
+ ScheduleWakeUp(ctx);
+
+ //UpdateConsumersWithCustomQuota(ctx); // depricated. Delete this after 01.10.2023
+}
+
+void TReadQuoter::HandleQuotaRequest(TEvPQ::TEvRequestQuota::TPtr& ev, const TActorContext& ctx) {
+ TConsumerReadQuota* consumer = GetOrCreateConsumerQuota(ev->Get()->ReadRequest->Get()->ClientId, ctx);
+ QuotaRequestedTimes.emplace(ev->Get()->ReadRequest->Cookie, ctx.Now());
+
+ if (consumer->ReadSpeedLimiter) {
+ Send(consumer->ReadSpeedLimiter->Actor, new NAccountReadQuoterEvents::TEvRequest(ev->Get()->ReadRequest.Release()));
+ } else {
+ CheckConsumerPerPartitionQuota(ev->Get()->ReadRequest, ctx);
+ }
+}
+
+void TReadQuoter::HandleAccountQuotaApproved(NAccountReadQuoterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx) {
+ CheckConsumerPerPartitionQuota(ev->Get()->ReadRequest, ctx);
+}
+
+void TReadQuoter::CheckConsumerPerPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) {
+ TConsumerReadQuota* consumerQuota = GetOrCreateConsumerQuota(ev->Get()->ClientId, ctx);
+ if (!consumerQuota->PartitionPerConsumerQuotaTracker.CanExaust(ctx.Now())) {
+ consumerQuota->ReadRequests.push_back(ev);
+ return;
+ }
+ CheckTotalPartitionQuota(ev, ctx);
+}
+
+void TReadQuoter::CheckTotalPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) {
+ if (!PartitionTotalQuotaTracker.CanExaust(ctx.Now())) {
+ ReadRequests.push_back(ev);
+ return;
+ }
+ ApproveQuota(ev, ctx);
+}
+
+void TReadQuoter::ApproveQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) {
+ auto waitTime = TDuration::Zero();
+ auto waitTimeIter = QuotaRequestedTimes.find(ev->Get()->Cookie);
+ if (waitTimeIter != QuotaRequestedTimes.end()) {
+ waitTime = ctx.Now() - waitTimeIter->second;
+ QuotaRequestedTimes.erase(waitTimeIter);
+ }
+ Send(PartitionActor, new TEvPQ::TEvApproveQuota(ev, waitTime));
+}
+
+void TReadQuoter::HandleWakeUp(TEvents::TEvWakeup::TPtr&, const TActorContext& ctx) {
+ for (auto& [consumerStr, consumer] : ConsumerQuotas) {
+ while (!consumer.ReadRequests.empty() && consumer.PartitionPerConsumerQuotaTracker.CanExaust(ctx.Now())) {
+ auto readEvent(std::move(consumer.ReadRequests.front()));
+ consumer.ReadRequests.pop_front();
+ CheckTotalPartitionQuota(readEvent, ctx);
+ }
+ }
+ while (!ReadRequests.empty() && PartitionTotalQuotaTracker.CanExaust(ctx.Now())) {
+ auto readEvent(std::move(ReadRequests.front()));
+ ReadRequests.pop_front();
+ ApproveQuota(readEvent, ctx);
+ }
+ ScheduleWakeUp(ctx);
+}
+
+void TReadQuoter::HandleConsumerRemoved(TEvPQ::TEvConsumerRemoved::TPtr& ev, const TActorContext&) {
+ auto it = ConsumerQuotas.find(ev->Get()->Consumer);
+ if(it != ConsumerQuotas.end()) {
+ if(it->second.ReadSpeedLimiter) {
+ Send(it->second.ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill());
+ }
+ ConsumerQuotas.erase(it);
+ }
+}
+
+void TReadQuoter::HandleConsumed(TEvPQ::TEvConsumed::TPtr& ev,const TActorContext& ctx) {
+ PartitionTotalQuotaTracker.Exaust(ev->Get()->ReadBytes, ctx.Now());
+ auto consumerQuota = GetIfExists(ev->Get()->Consumer);
+ if(!consumerQuota)
+ return;
+ if (consumerQuota->ReadSpeedLimiter) {
+ Send(
+ consumerQuota->ReadSpeedLimiter->Actor,
+ new NAccountReadQuoterEvents::TEvConsumed(ev->Get()->ReadBytes, ev->Get()->ReadRequestCookie)
+ );
+ }
+ consumerQuota->PartitionPerConsumerQuotaTracker.Exaust(ev->Get()->ReadBytes, ctx.Now());
+}
+
+void TReadQuoter::HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
+ PQTabletConfig = ev->Get()->Config;
+ UpdateQuota(ctx);
+ //UpdateConsumersWithCustomQuota(ctx); // depricated. Delete this after 01.10.2023
+}
+
+void TReadQuoter::HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEvCounters::TPtr& ev, const TActorContext&) {
+ auto consumerQuota = GetIfExists(ev->Get()->User);
+ if(!consumerQuota)
+ return;
+ if (consumerQuota->ReadSpeedLimiter) {
+ auto diff = ev->Get()->Counters.MakeDiffForAggr(consumerQuota->ReadSpeedLimiter->Baseline);
+ ev->Get()->Counters.RememberCurrentStateAsBaseline(consumerQuota->ReadSpeedLimiter->Baseline);
+ Send(PartitionActor, new NReadQuoterEvents::TEvQuotaCountersUpdated(diff));
+ }
+}
+
+void TReadQuoter::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
+ for (auto& consumerQuota : ConsumerQuotas) {
+ if(consumerQuota.second.ReadSpeedLimiter) {
+ Send(consumerQuota.second.ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill());
+ }
+ }
+ ConsumerQuotas.clear();
+ Die(ctx);
+}
+
+void TReadQuoter::UpdateQuota(const TActorContext &ctx) {
+ TVector<std::pair<TString, ui64>> updatedQuotas;
+ for (auto& [consumerStr, consumerQuota] : ConsumerQuotas) {
+ if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(
+ GetConsumerReadBurst(ctx),
+ GetConsumerReadSpeed(ctx))) {
+ updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()});
+ }
+ }
+ auto totalQuotaUpdated = PartitionTotalQuotaTracker.UpdateConfigIfChanged(GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx));
+ if (updatedQuotas.size() || totalQuotaUpdated) {
+ Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated(updatedQuotas, PartitionTotalQuotaTracker.GetTotalSpeed()));
+ }
+}
+
+void TReadQuoter::UpdateConsumersWithCustomQuota(const TActorContext &ctx) {
+ TVector<std::pair<TString, ui64>> updatedQuotas;
+ for (const auto& readQuota : PQTabletConfig.GetPartitionConfig().GetReadQuota()) {
+ auto consumerQuota = GetOrCreateConsumerQuota(readQuota.GetClientId(), ctx);
+ if (consumerQuota->PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond())) {
+ updatedQuotas.push_back({readQuota.GetClientId(), consumerQuota->PartitionPerConsumerQuotaTracker.GetTotalSpeed()});
+ }
+ }
+ if (updatedQuotas.size()) {
+ Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated(updatedQuotas, PartitionTotalQuotaTracker.GetTotalSpeed()));
+ }
+}
+
+ui64 TReadQuoter::GetConsumerReadSpeed(const TActorContext& ctx) const {
+ return AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota() ?
+ PQTabletConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2
+ : DEFAULT_READ_SPEED_AND_BURST;
+}
+
+ui64 TReadQuoter::GetConsumerReadBurst(const TActorContext& ctx) const {
+ return AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota() ?
+ PQTabletConfig.GetPartitionConfig().GetBurstSize() * 2
+ : DEFAULT_READ_SPEED_AND_BURST;
+}
+
+ui64 TReadQuoter::GetTotalPartitionReadSpeed(const TActorContext& ctx) const {
+ auto consumersPerPartition = AppData(ctx)->PQConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition();
+ return GetConsumerReadSpeed(ctx) * consumersPerPartition;
+}
+
+ui64 TReadQuoter::GetTotalPartitionReadBurst(const TActorContext& ctx) const {
+ auto consumersPerPartition = AppData(ctx)->PQConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition();
+ return GetConsumerReadBurst(ctx) * consumersPerPartition;
+}
+
+TConsumerReadQuota* TReadQuoter::GetOrCreateConsumerQuota(const TString& consumerStr, const TActorContext& ctx) {
+ Y_VERIFY(!consumerStr.empty());
+ auto it = ConsumerQuotas.find(consumerStr);
+ if (it == ConsumerQuotas.end()) {
+ TConsumerReadQuota consumer(CreateReadSpeedLimiter(consumerStr), GetConsumerReadBurst(ctx), GetConsumerReadSpeed(ctx));
+ auto result = ConsumerQuotas.emplace(consumerStr, std::move(consumer));
+ Y_VERIFY(result.second);
+ Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated({{consumerStr, consumer.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}}, PartitionTotalQuotaTracker.GetTotalSpeed()));
+ return &result.first->second;
+ }
+ return &it->second;
+}
+
+TConsumerReadQuota* TReadQuoter::GetIfExists(const TString& consumerStr) {
+ auto it = ConsumerQuotas.find(consumerStr);
+ return it != ConsumerQuotas.end() ? &it->second : nullptr;
+}
+
+void TReadQuoter::ScheduleWakeUp(const TActorContext& ctx) {
+ ctx.Schedule(WAKE_UP_TIMEOUT, new TEvents::TEvWakeup());
+}
+
+THolder<TAccountReadQuoterHolder> TReadQuoter::CreateReadSpeedLimiter(const TString& user) const {
+ const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig();
+ if (TabletActor && quotingConfig.GetEnableQuoting() && quotingConfig.GetEnableReadQuoting()) {
+ TActorId actorId = TActivationContext::Register(
+ new TAccountReadQuoter(
+ TabletActor,
+ SelfId(),
+ TabletId,
+ TopicConverter,
+ Partition,
+ user,
+ Counters
+ ),
+ PartitionActor
+ );
+ return MakeHolder<TAccountReadQuoterHolder>(actorId, Counters);
+ }
+ return nullptr;
+}
+
+TQuotaTracker TReadQuoter::CreatePartitionTotalQuotaTracker(const TActorContext& ctx) const {
+ return {GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx), ctx.Now()};
+}
+
+}// NPQ
+}// NKikimr
diff --git a/ydb/core/persqueue/read_quoter.h b/ydb/core/persqueue/read_quoter.h
new file mode 100644
index 00000000000..6b87bc0605a
--- /dev/null
+++ b/ydb/core/persqueue/read_quoter.h
@@ -0,0 +1,150 @@
+#pragma once
+
+#include "quota_tracker.h"
+#include "account_read_quoter.h"
+#include "user_info.h"
+
+#include <ydb/core/quoter/public/quoter.h>
+#include <ydb/core/persqueue/events/internal.h>
+
+#include <library/cpp/actors/core/hfunc.h>
+
+
+namespace NKikimr {
+namespace NPQ {
+
+namespace NReadQuoterEvents {
+
+struct TEvQuotaUpdated : public TEventLocal<TEvQuotaUpdated, TEvPQ::EvQuotaUpdated> {
+ TEvQuotaUpdated(TVector<std::pair<TString, ui64>> updatedConsumerQuotas, ui64 updatedTotalPartitionQuota)
+ : UpdatedConsumerQuotas(std::move(updatedConsumerQuotas)),
+ UpdatedTotalPartitionQuota(updatedTotalPartitionQuota)
+ {}
+
+ TVector<std::pair<TString, ui64>> UpdatedConsumerQuotas;
+ ui64 UpdatedTotalPartitionQuota;
+};
+
+struct TEvQuotaCountersUpdated : public TEventLocal<TEvQuotaCountersUpdated, TEvPQ::EvQuotaCountersUpdated> {
+ TEvQuotaCountersUpdated(TAutoPtr<TTabletCountersBase> counters)
+ : Counters(std::move(counters))
+ {}
+
+ TAutoPtr<TTabletCountersBase> Counters;
+};
+
+}// NReadQuoterEvents
+
+struct TAccountReadQuoterHolder {
+ TAccountReadQuoterHolder(const TActorId& actor, const TTabletCountersBase& baseline)
+ : Actor(actor)
+ {
+ Baseline.Populate(baseline);
+ }
+
+ TActorId Actor;
+ TTabletCountersBase Baseline;
+};
+
+class TConsumerReadQuota {
+ public:
+ TConsumerReadQuota(THolder<TAccountReadQuoterHolder> readSpeedLimiter, ui64 readQuotaBurst, ui64 readQuotaSpeed):
+ PartitionPerConsumerQuotaTracker(readQuotaBurst, readQuotaSpeed, TAppData::TimeProvider->Now()),
+ ReadSpeedLimiter(std::move(readSpeedLimiter))
+ { }
+ public:
+ TQuotaTracker PartitionPerConsumerQuotaTracker;
+ THolder<TAccountReadQuoterHolder> ReadSpeedLimiter;
+ std::deque<TEvPQ::TEvRead::TPtr> ReadRequests;
+};
+
+class TReadQuoter : public TActorBootstrapped<TReadQuoter> {
+public:
+ TReadQuoter(
+ const TActorContext& ctx,
+ TActorId partitionActor,
+ const NPersQueue::TTopicConverterPtr& topicConverter,
+ const NKikimrPQ::TPQTabletConfig& config,
+ ui32 partition,
+ TActorId tabletActor,
+ ui64 tabletId,
+ const TTabletCountersBase& counters
+ ):
+ TabletActor(tabletActor),
+ PartitionActor(partitionActor),
+ PQTabletConfig(config),
+ PartitionTotalQuotaTracker(CreatePartitionTotalQuotaTracker(ctx)),
+ TopicConverter(topicConverter),
+ Partition(partition),
+ TabletId(tabletId)
+ {
+ Counters.Populate(counters);
+ }
+
+private:
+ STFUNC(StateWork)
+ {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPQ::TEvRequestQuota, HandleQuotaRequest);
+ HFunc(TEvents::TEvWakeup, HandleWakeUp);
+ HFunc(NAccountReadQuoterEvents::TEvResponse, HandleAccountQuotaApproved);
+ HFunc(TEvPQ::TEvConsumed, HandleConsumed);
+ HFunc(TEvPQ::TEvChangePartitionConfig, HandleConfigUpdate);
+ HFunc(NAccountReadQuoterEvents::TEvCounters, HandleUpdateAccountQuotaCounters);
+ HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
+ HFunc(TEvPQ::TEvConsumerRemoved, HandleConsumerRemoved);
+ default:
+ break;
+ };
+ }
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::PERSQUEUE_READ_QUOTER;
+ }
+
+ void Bootstrap(const TActorContext &ctx);
+
+ void HandleQuotaRequest(TEvPQ::TEvRequestQuota::TPtr& ev,const TActorContext& ctx);
+ void HandleAccountQuotaApproved(NAccountReadQuoterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx);
+ void HandleWakeUp(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
+ void HandleConsumed(TEvPQ::TEvConsumed::TPtr& ev, const TActorContext& ctx);
+ void HandlePoisonPill(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
+ void HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx);
+ void HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx);
+ void HandleConsumerRemoved(TEvPQ::TEvConsumerRemoved::TPtr& ev, const TActorContext& ctx);
+
+private:
+ TConsumerReadQuota* GetOrCreateConsumerQuota(const TString& consumerStr, const TActorContext& ctx);
+ THolder<TAccountReadQuoterHolder> CreateReadSpeedLimiter(const TString& user) const;
+ TQuotaTracker CreatePartitionTotalQuotaTracker(const TActorContext& ctx) const;
+ void CheckConsumerPerPartitionQuota(TEvPQ::TEvRead::TPtr, const TActorContext& ctx);
+ void CheckTotalPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx);
+ void ApproveQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx);
+ void ScheduleWakeUp(const TActorContext& ctx);
+ void UpdateConsumersWithCustomQuota(const TActorContext &ctx);
+ void UpdateQuota(const TActorContext &ctx);
+ void UpdateCounters(const TVector<std::pair<TString, ui64>>& updatedConsumerQuotas);
+ TConsumerReadQuota* GetIfExists(const TString& consumerStr);
+ ui64 GetConsumerReadSpeed(const TActorContext& ctx) const;
+ ui64 GetConsumerReadBurst(const TActorContext& ctx) const;
+ ui64 GetTotalPartitionReadSpeed(const TActorContext& ctx) const;
+ ui64 GetTotalPartitionReadBurst(const TActorContext& ctx) const;
+
+private:
+ TActorId TabletActor;
+ TActorId PartitionActor;
+ THashMap<TString, TConsumerReadQuota> ConsumerQuotas;
+ THashMap<ui64, TInstant> QuotaRequestedTimes;
+ std::deque<TEvPQ::TEvRead::TPtr> ReadRequests;
+ NKikimrPQ::TPQTabletConfig PQTabletConfig;
+ TQuotaTracker PartitionTotalQuotaTracker;
+ NPersQueue::TTopicConverterPtr TopicConverter;
+ const ui32 Partition;
+ ui64 TabletId;
+ TTabletCountersBase Counters;
+};
+
+
+}// NPQ
+}// NKikimr
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index b64c92609f7..317a761f20d 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -39,10 +39,8 @@ namespace NDeprecatedUserData {
TUsersInfoStorage::TUsersInfoStorage(
TString dcId,
- ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
- const TTabletCountersBase& counters,
const NKikimrPQ::TPQTabletConfig& config,
const TString& cloudId,
const TString& dbId,
@@ -51,7 +49,6 @@ TUsersInfoStorage::TUsersInfoStorage(
const TString& folderId
)
: DCId(std::move(dcId))
- , TabletId(tabletId)
, TopicConverter(topicConverter)
, Partition(partition)
, Config(config)
@@ -62,7 +59,6 @@ TUsersInfoStorage::TUsersInfoStorage(
, FolderId(folderId)
, CurReadRuleGeneration(0)
{
- Counters.Populate(counters);
}
void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx) {
@@ -141,10 +137,9 @@ void TUsersInfoStorage::Parse(const TString& key, const TString& data, const TAc
userInfo->Parsed = true;
}
-void TUsersInfoStorage::Remove(const TString& user, const TActorContext& ctx) {
+void TUsersInfoStorage::Remove(const TString& user, const TActorContext&) {
auto it = UsersInfo.find(user);
Y_VERIFY(it != UsersInfo.end());
- it->second.Clear(ctx);
UsersInfo.erase(it);
}
@@ -174,12 +169,6 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
ui32 gen, ui32 step, i64 offset, ui64 readOffsetRewindSum,
TInstant readFromTimestamp) const
{
- ui64 burst = 1'000'000'000, speed = 1'000'000'000;
- if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
- burst = Config.GetPartitionConfig().GetBurstSize() * 2;
- speed = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2;
- }
-
TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName();
TString userServiceType = "";
for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) {
@@ -193,9 +182,10 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
return {
- ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition,
+ ctx, StreamCountersSubgroup,
+ user, readRuleGeneration, important, TopicConverter, Partition,
session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath,
- meterRead, burst, speed
+ meterRead
};
}
@@ -216,38 +206,9 @@ TUserInfo& TUsersInfoStorage::Create(
return result.first->second;
}
-void TUsersInfoStorage::Clear(const TActorContext& ctx) {
- for (auto& userInfoPair : UsersInfo) {
- userInfoPair.second.Clear(ctx);
- }
+void TUsersInfoStorage::Clear(const TActorContext&) {
UsersInfo.clear();
}
-void TUserInfo::Clear(const TActorContext& ctx) {
- if (ReadSpeedLimiter) {
- ctx.Send(ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill());
- }
-}
-
-THolder<TReadSpeedLimiterHolder> TUsersInfoStorage::CreateReadSpeedLimiter(const TString& user) const {
- const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig();
- if (TabletActor && quotingConfig.GetEnableQuoting() && quotingConfig.GetEnableReadQuoting()) {
- TActorId actorId = TActivationContext::Register(
- new TReadSpeedLimiter(
- TabletActor.GetRef(),
- PartitionActor.GetRef(),
- TabletId,
- TopicConverter,
- Partition,
- user,
- Counters
- ),
- PartitionActor.GetRef()
- );
- return MakeHolder<TReadSpeedLimiterHolder>(actorId, Counters);
- }
- return nullptr;
-}
-
} //NPQ
} //NKikimr
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 8e5faf9d0e8..d76b4765013 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -4,7 +4,7 @@
#include "subscriber.h"
#include "percentile_counter.h"
#include "quota_tracker.h"
-#include "read_speed_limiter.h"
+#include "account_read_quoter.h"
#include "metering_sink.h"
#include <ydb/core/base/counters.h>
@@ -36,17 +36,6 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer";
typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters;
-struct TReadSpeedLimiterHolder {
- TReadSpeedLimiterHolder(const TActorId& actor, const TTabletCountersBase& baseline)
- : Actor(actor)
- {
- Baseline.Populate(baseline);
- }
-
- TActorId Actor;
- TTabletCountersBase Baseline;
-};
-
struct TUserInfoBase {
TString User;
ui64 ReadRuleGeneration = 0;
@@ -61,8 +50,6 @@ struct TUserInfoBase {
};
struct TUserInfo: public TUserInfoBase {
- THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter;
-
TInstant WriteTimestamp;
TInstant CreateTimestamp;
TInstant ReadTimestamp;
@@ -83,16 +70,13 @@ struct TUserInfo: public TUserInfoBase {
THolder<TUserLabeledCounters> LabeledCounters;
NPersQueue::TTopicConverterPtr TopicConverter;
- std::deque<std::pair<TReadInfo, ui64>> ReadRequests;
-
- TQuotaTracker ReadQuota;
-
TWorkingTimeCounter Counter;
NKikimr::NPQ::TMultiCounter BytesRead;
NKikimr::NPQ::TMultiCounter MsgsRead;
TMap<TString, NKikimr::NPQ::TMultiCounter> BytesReadFromDC;
ui32 ActiveReads;
+ ui32 ReadsInQuotaQueue;
ui32 Subscriptions;
i64 EndOffset;
@@ -113,7 +97,7 @@ struct TUserInfo: public TUserInfoBase {
}
void UpdateReadingState() {
- Counter.UpdateState(Subscriptions > 0 || ActiveReads > 0 || ReadRequests.size() > 0); //no data for read or got read requests from client
+ Counter.UpdateState(Subscriptions > 0 || ActiveReads > 0 || ReadsInQuotaQueue > 0); //no data for read or got read requests from client
}
void UpdateReadingTimeAndState(TInstant now) {
@@ -155,7 +139,6 @@ struct TUserInfo: public TUserInfoBase {
if (it != BytesReadFromDC.end())
it->second.Inc(readSize);
}
- ReadQuota.Exaust(readSize, now);
for (auto& avg : AvgReadBytes) {
avg.Update(readSize, now);
}
@@ -167,15 +150,14 @@ struct TUserInfo: public TUserInfoBase {
TUserInfo(
const TActorContext& ctx,
- NMonitoring::TDynamicCounterPtr streamCountersSubgroup, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user,
+ NMonitoring::TDynamicCounterPtr streamCountersSubgroup,
+ const TString& user,
const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter,
const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset,
const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp,
- const TString& dbPath, bool meterRead,
- ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000
+ const TString& dbPath, bool meterRead
)
: TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, important, readFromTimestamp}
- , ReadSpeedLimiter(std::move(readSpeedLimiter))
, WriteTimestamp(TAppData::TimeProvider->Now())
, CreateTimestamp(TAppData::TimeProvider->Now())
, ReadTimestamp(TAppData::TimeProvider->Now())
@@ -187,9 +169,9 @@ struct TUserInfo: public TUserInfoBase {
, ReadScheduled(false)
, HasReadRule(false)
, TopicConverter(topicConverter)
- , ReadQuota(burst, speed, TAppData::TimeProvider->Now())
, Counter(nullptr)
, ActiveReads(0)
+ , ReadsInQuotaQueue(0)
, Subscriptions(0)
, EndOffset(0)
, AvgReadBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000},
@@ -277,12 +259,6 @@ struct TUserInfo: public TUserInfoBase {
}
- void SetQuota(const ui64 maxBurst, const ui64 speed) {
- ReadQuota.UpdateConfig(maxBurst, speed);
- }
-
- void Clear(const TActorContext& ctx);
-
void UpdateReadOffset(const i64 offset, TInstant writeTimestamp, TInstant createTimestamp, TInstant now) {
ReadOffset = offset;
ReadWriteTimestamp = writeTimestamp;
@@ -377,9 +353,16 @@ struct TUserInfo: public TUserInfoBase {
class TUsersInfoStorage {
public:
- TUsersInfoStorage(TString dcId, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition,
- const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config,
- const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId);
+ TUsersInfoStorage(
+ TString dcId,
+ const NPersQueue::TTopicConverterPtr& topicConverter,
+ ui32 partition,
+ const NKikimrPQ::TPQTabletConfig& config,
+ const TString& CloudId,
+ const TString& DbId,
+ const TString& DbPath,
+ const bool isServerless,
+ const TString& FolderId);
void Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx);
@@ -407,7 +390,6 @@ public:
void Remove(const TString& user, const TActorContext& ctx);
private:
- THolder<TReadSpeedLimiterHolder> CreateReadSpeedLimiter(const TString& user) const;
TUserInfo CreateUserInfo(const TActorContext& ctx,
const TString& user,
@@ -421,10 +403,8 @@ private:
THashMap<TString, TUserInfo> UsersInfo;
const TString DCId;
- ui64 TabletId;
NPersQueue::TTopicConverterPtr TopicConverter;
const ui32 Partition;
- TTabletCountersBase Counters;
NMonitoring::TDynamicCounterPtr StreamCountersSubgroup;
TMaybe<TActorId> TabletActor;
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index e3e2e27da00..c291086ed37 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -68,28 +68,28 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
tabletConfig->AddReadRules("user");
tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
tabletConfig->SetMeteringMode(parameters.meteringMode);
- auto config = tabletConfig->MutablePartitionConfig();
- if (parameters.speed > 0) {
- config->SetWriteSpeedInBytesPerSecond(parameters.speed);
- config->SetBurstSize(parameters.speed);
+ auto partitionConfig = tabletConfig->MutablePartitionConfig();
+ if (parameters.writeSpeed > 0) {
+ partitionConfig->SetWriteSpeedInBytesPerSecond(parameters.writeSpeed);
+ partitionConfig->SetBurstSize(parameters.writeSpeed);
}
- config->SetMaxCountInPartition(parameters.maxCountInPartition);
- config->SetMaxSizeInPartition(parameters.maxSizeInPartition);
+ partitionConfig->SetMaxCountInPartition(parameters.maxCountInPartition);
+ partitionConfig->SetMaxSizeInPartition(parameters.maxSizeInPartition);
if (parameters.storageLimitBytes > 0) {
- config->SetStorageLimitBytes(parameters.storageLimitBytes);
+ partitionConfig->SetStorageLimitBytes(parameters.storageLimitBytes);
} else {
- config->SetLifetimeSeconds(parameters.deleteTime);
+ partitionConfig->SetLifetimeSeconds(parameters.deleteTime);
}
- config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
+ partitionConfig->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
if (parameters.sidMaxCount > 0)
- config->SetSourceIdMaxCounts(parameters.sidMaxCount);
- config->SetMaxWriteInflightSize(90'000'000);
- config->SetLowWatermark(parameters.lowWatermark);
+ partitionConfig->SetSourceIdMaxCounts(parameters.sidMaxCount);
+ partitionConfig->SetMaxWriteInflightSize(90'000'000);
+ partitionConfig->SetLowWatermark(parameters.lowWatermark);
for (auto& u : users) {
if (u.second)
- config->AddImportantClientId(u.first);
+ partitionConfig->AddImportantClientId(u.first);
if (u.first != "user")
tabletConfig->AddReadRules(u.first);
}
@@ -864,7 +864,7 @@ TVector<TString> CmdSourceIdRead(TTestContext& tc) {
}
-void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs) {
+void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs, const TString user) {
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
THolder<TEvPersQueue::TEvRequest> request;
@@ -877,7 +877,7 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui
req->SetPartition(partition);
auto read = req->MutableCmdRead();
read->SetOffset(offset);
- read->SetClientId("user");
+ read->SetClientId(user);
read->SetCount(count);
read->SetBytes(size);
if (maxTimeLagMs > 0) {
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 003c89642a4..0f32885afcc 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -77,7 +77,6 @@ public:
}
};
-
struct TTestContext {
const TTabletTypes::EType PQTabletType = TTabletTypes::PersQueue;
const TTabletTypes::EType BalancerTabletType = TTabletTypes::PersQueueReadBalancer;
@@ -253,7 +252,7 @@ struct TTabletPreparationParameters {
ui64 readFromTimestampsMs{0};
ui64 sidMaxCount{0};
ui32 specVersion{0};
- ui32 speed{0};
+ ui32 writeSpeed{0};
i32 storageLimitBytes{0};
TString folderId{"somefolder"};
TString cloudId{"somecloud"};
@@ -458,7 +457,8 @@ void CmdRead(
TTestContext& tc,
TVector<i32> offsets = {},
const ui32 maxTimeLagMs = 0,
- const ui64 readTimestampMs = 0);
+ const ui64 readTimestampMs = 0,
+ const TString user = "user");
void CmdReserveBytes(
const ui32 partition,
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index ec85e940c66..dac1905f924 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -20,6 +20,70 @@ const static TString TOPIC_NAME = "rt3.dc1--topic";
Y_UNIT_TEST_SUITE(TPQTest) {
+Y_UNIT_TEST(TestPartitionTotalQuota) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ tc.Runtime->SetScheduledLimit(1000);
+
+ tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true);
+ tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetMaxParallelConsumersPerPartition(1); //total partition quota is equal to quota per consumer. Very low.
+
+ PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"important_user", true}}, tc);
+ TVector<std::pair<ui64, TString>> data;
+ TString s{2_MB, 'c'};
+ data.push_back({1, s});
+ CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
+
+ //check throttling on total partition quota
+ auto startTime = tc.Runtime->GetTimeProvider()->Now();
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1");
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user2");
+ auto diff = (tc.Runtime->GetTimeProvider()->Now() - startTime).Seconds();
+ UNIT_ASSERT(diff >= 9); //read quota is twice write quota. So, it's 200kb per seconds and 200kb burst. (2mb - 200kb) / 200kb = 9 seconds needed to get quota
+ });
+}
+
+Y_UNIT_TEST(TestPartitionPerConsumerQuota) {
+ TTestContext tc;
+ RunTestWithReboots(tc.TabletIds, [&]() {
+ return tc.InitialEventsFilter.Prepare();
+ }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
+ TFinalizer finalizer(tc);
+ tc.Prepare(dispatchName, setup, activeZone);
+ activeZone = false;
+ tc.Runtime->SetScheduledLimit(1000);
+
+ tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true);
+ tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetMaxParallelConsumersPerPartition(1000); //total partition quota is 1 consumer quota * 1000. Very high.
+
+
+ PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"important_user", true}}, tc);
+ TVector<std::pair<ui64, TString>> data;
+ TString s{2_MB, 'c'};
+ data.push_back({1, s});
+ CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true);
+
+ //check throttling on per consumer quota
+ auto startTimeReadWithSameConsumer = tc.Runtime->GetTimeProvider()->Now();
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1");
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1");
+ auto diffReadWithSameConsumers = (tc.Runtime->GetTimeProvider()->Now() - startTimeReadWithSameConsumer).Seconds();
+ UNIT_ASSERT(diffReadWithSameConsumers >= 9); //read quota is twice write quota. So, it's 200kb per seconds and 200kb burst. (2mb - 200kb) / 200kb = 9 seconds needed to get quota
+
+ //check not throttling on total partition quota
+ auto startTimeReadWithDifferentConsumers = tc.Runtime->GetTimeProvider()->Now();
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user2");
+ CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user3");
+ auto diffReadWithDifferentConsumers = (tc.Runtime->GetTimeProvider()->Now() - startTimeReadWithDifferentConsumers).Seconds();
+ UNIT_ASSERT(diffReadWithDifferentConsumers <= 1); //different consumers. No throttling
+ });
+}
+
Y_UNIT_TEST(TestGroupsBalancer) {
TTestContext tc;
TFinalizer finalizer(tc);
@@ -1056,7 +1120,7 @@ void TestWritePQImpl(bool fast) {
tc.Runtime->SetScheduledLimit(100);
// Important client, lifetimeseconds=0 - never delete
- PQTabletPrepare({.partitions = 2, .speed = 200000000}, {{"user", true}}, tc);
+ PQTabletPrepare({.partitions = 2, .writeSpeed = 200000000}, {{"user", true}}, tc);
TVector<std::pair<ui64, TString>> data, data1, data2;
activeZone = PlainOrSoSlow(true, false) && fast;
diff --git a/ydb/core/persqueue/ut/resources/counters_labeled.json b/ydb/core/persqueue/ut/resources/counters_labeled.json
index 85804aa58b8..28fd813977d 100644
--- a/ydb/core/persqueue/ut/resources/counters_labeled.json
+++ b/ydb/core/persqueue/ut/resources/counters_labeled.json
@@ -1817,24 +1817,6 @@
"labels": {
"user_counters": "PersQueue",
"topic": "rt3.dc1--asdfgs--topic",
- "sensor": "PQ/ReserveLimitBytes"
- },
- "value": 0
- },
- {
- "kind": "GAUGE",
- "labels": {
- "user_counters": "PersQueue",
- "topic": "rt3.dc1--asdfgs--topic",
- "sensor": "PQ/ReserveUsedBytes"
- },
- "value": 0
- },
- {
- "kind": "GAUGE",
- "labels": {
- "user_counters": "PersQueue",
- "topic": "rt3.dc1--asdfgs--topic",
"sensor": "PQ/SourceIdCount"
},
"value": 3
@@ -2141,24 +2123,6 @@
"labels": {
"user_counters": "PersQueue",
"topic": "total",
- "sensor": "PQ/ReserveLimitBytes"
- },
- "value": 0
- },
- {
- "kind": "GAUGE",
- "labels": {
- "user_counters": "PersQueue",
- "topic": "total",
- "sensor": "PQ/ReserveUsedBytes"
- },
- "value": 0
- },
- {
- "kind": "GAUGE",
- "labels": {
- "user_counters": "PersQueue",
- "topic": "total",
"sensor": "PQ/SourceIdCount"
},
"value": 3
diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html
index 1bf45fdec3f..adbf9f9247b 100644
--- a/ydb/core/persqueue/ut/resources/counters_topics.html
+++ b/ydb/core/persqueue/ut/resources/counters_topics.html
@@ -13,6 +13,8 @@ host=:
name=topic.partition.alive_count: 1
name=topic.partition.init_duration_milliseconds_max: 0
name=topic.partition.producers_count_max: 3
+ name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
+ name=topic.partition.read.throttled_microseconds_max: 0
name=topic.partition.storage_bytes_max: 747
name=topic.partition.total_count: 2
name=topic.partition.uptime_milliseconds_min: 30000
@@ -22,7 +24,7 @@ host=:
name=topic.partition.write.idle_milliseconds_max: 0
name=topic.partition.write.lag_milliseconds_max: 600
name=topic.partition.write.speed_limit_bytes_per_second: 50000000
- name=topic.partition.write.throttled_nanoseconds_max: 0
+ name=topic.partition.write.throttled_microseconds_max: 0
name=topic.producers_count: 3
name=topic.reserve.limit_bytes: 0
name=topic.reserve.used_bytes: 0
@@ -37,6 +39,8 @@ host=:
name=topic.partition.read.idle_milliseconds_max: 30000
name=topic.partition.read.lag_messages_max: 29
name=topic.partition.read.lag_milliseconds_max: 0
+ name=topic.partition.read.speed_limit_bytes_per_second: 1000000000
+ name=topic.partition.read.throttled_microseconds_max: 0
name=topic.partition.write.lag_milliseconds_max: 200
name=topic.read.lag_messages: 29
</pre>
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 520b06b0789..b14b9184059 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -22,7 +22,8 @@ SRCS(
pq_l2_cache.cpp
quota_tracker.cpp
read_balancer.cpp
- read_speed_limiter.cpp
+ account_read_quoter.cpp
+ read_quoter.cpp
sourceid.cpp
subscriber.cpp
transaction.cpp
diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto
index 466de907f89..f242539fcf7 100644
--- a/ydb/core/protos/counters_pq.proto
+++ b/ydb/core/protos/counters_pq.proto
@@ -172,13 +172,13 @@ enum EClientLabeledCounters {
METRIC_TOTAL_READ_SPEED_4 = 20 [(LabeledCounterOpts) = {Name: "ReadBytesPerDay" AggrFunc : EAF_SUM SVName: ""}];
METRIC_MAX_READ_SPEED_4 = 21 [(LabeledCounterOpts) = {Name: "ReadBytesMaxPerDay" AggrFunc : EAF_MAX SVName: ""}];
- METRIC_READ_QUOTA_BYTES = 22 [(LabeledCounterOpts) = {Name: "ReadBytesQuota" AggrFunc : EAF_MIN SVName: ""}];
+ METRIC_READ_QUOTA_PER_CONSUMER_BYTES = 22 [(LabeledCounterOpts) = {Name: "ReadBytesQuota" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];
METRIC_READ_TIME_LAG = 23 [(LabeledCounterOpts) = {Name: "ReadTimeLagMs" AggrFunc : EAF_MAX SVName: "topic.partition.read.lag_milliseconds_max"}];
METRIC_WRITE_TIME_LAG = 24 [(LabeledCounterOpts) = {Name: "WriteTimeLagMsByLastRead" AggrFunc : EAF_MAX SVName: "topic.partition.write.lag_milliseconds_max"}];
METRIC_LAST_READ_TIME = 25 [(LabeledCounterOpts) = {Name: "TimeSinceLastReadMs" AggrFunc : EAF_MIN Type : CT_TIMELAG SVName: "topic.partition.read.idle_milliseconds_max"}];
- METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX SVName: ""}];
+ METRIC_READ_QUOTA_PER_CONSUMER_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.read.throttled_microseconds_max"}];
}
@@ -224,11 +224,13 @@ enum EPartitionLabeledCounters {
METRIC_MAX_QUOTA_SPEED_3 = 29 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerHour" AggrFunc : EAF_MAX SVName: "topic.partition.write.bytes_per_hour_max"}];
METRIC_TOTAL_QUOTA_SPEED_4 = 30 [(LabeledCounterOpts) = {Name: "QuotaBytesPerDay" AggrFunc : EAF_SUM SVName: ""}];
METRIC_MAX_QUOTA_SPEED_4 = 31 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerDay" AggrFunc : EAF_MAX SVName: "topic.partition.write.bytes_per_day_max"}];
- METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.write.throttled_nanoseconds_max"}];
+ METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.write.throttled_microseconds_max"}];
METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN SVName: ""}];
METRIC_PARTITIONS_TOTAL = 34 [(LabeledCounterOpts) = {Name: "PartitionsTotal" AggrFunc : EAF_MAX SVName: "topic.partition.total_count"}];
- METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "ReserveLimitBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}];
- METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "ReserveUsedBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}];
+ METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}];
+ METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}];
+ METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE = 37 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.throttled_microseconds_max"}];
+ METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}];
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 1e3b26f44e4..e92d51cf9bb 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -80,6 +80,8 @@ message TPQConfig {
optional bool PartitionReadQuotaIsTwiceWriteQuota = 7 [default = false];
+ optional uint32 MaxParallelConsumersPerPartition = 8 [default = 20];
+
}
optional TQuotingConfig QuotingConfig = 18;
diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp
index f4d0424a261..81acc72c393 100644
--- a/ydb/core/tablet/private/aggregated_counters.cpp
+++ b/ydb/core/tablet/private/aggregated_counters.cpp
@@ -545,6 +545,8 @@ void TAggregatedLabeledCounters::FillGetRequestV1(
labeledCounters->SetGroup(group);
labeledCounters->SetDelimiter("/"); //TODO: change here to "|"
for (ui32 i = start; i < end; ++i) {
+ if (strlen(Names[i]) == 0 || strcmp(Names[i], "PQ/") == 0)
+ continue;
auto& labeledCounter = *labeledCounters->AddLabeledCounter();
labeledCounter.SetValue(GetValue(i));
labeledCounter.SetId(GetId(i));
@@ -570,7 +572,7 @@ void TAggregatedLabeledCounters::FillGetRequestV2(
labeledCounter.SetValue(GetValue(i));
labeledCounter.SetNameId(context->GetNameId(Names[i]));
labeledCounter.SetAggregateFunc(NKikimr::TLabeledCounterOptions::EAggregateFunc(AggrFunc[i]));
- labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i]));
+ labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i]));
}
}
diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp
index 4cbf9e46325..3f4549c7cda 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator.cpp
@@ -157,6 +157,8 @@ public:
}
for (ui32 i = 0, e = labeledCounters->GetCounters().Size(); i < e; ++i) {
+ if(!strlen(labeledCounters->GetCounterName(i)))
+ continue;
const ui64& value = labeledCounters->GetCounters()[i].Get();
const ui64& id = labeledCounters->GetIds()[i].Get();
iterTabletType->second->SetValue(tabletId, i, value, id);
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index db1b977b166..f9b74e17657 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -836,7 +836,7 @@ message TActivity {
PERSQUEUE_MIRRORER = 451;
LONG_TX_SERVICE = 452;
LONG_TX_SERVICE_COMMIT = 453;
- PERSQUEUE_READ_SPEED_LIMITER = 454;
+ PERSQUEUE_ACCOUNT_READ_QUOTER = 454;
LONG_TX_SERVICE_ACQUIRE_SNAPSHOT = 455;
BLOCKSTORE_PARTITION_NONREPL = 456;
BS_PROXY_PATCH_ACTOR = 457;
@@ -995,5 +995,6 @@ message TActivity {
CMS_API_ADAPTER = 613;
KQP_SCAN_FETCH_ACTOR = 614;
COLUMNSHARD_CONVEYOR = 615;
+ PERSQUEUE_READ_QUOTER = 616;
};
};
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 6643bfe72df..0ded0094f30 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -813,6 +813,9 @@ message DescribeTopicResult {
// Zero value means default limit: 1 MB per second.
int64 partition_write_speed_bytes_per_second = 8;
+ int64 partition_total_read_speed_bytes_per_second = 14;
+ int64 partition_consumer_read_speed_bytes_per_second = 15;
+
// Burst size for write in partition, in bytes.
// Zero value means default limit: 1 MB.
int64 partition_write_burst_bytes = 9;
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 9058c9d1af6..8830ab95086 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -936,6 +936,12 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
Result.set_partition_write_burst_bytes(partConfig.GetBurstSize());
}
+ if (pqConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
+ auto readSpeedPerConsumer = partConfig.GetWriteSpeedInBytesPerSecond() * 2;
+ Result.set_partition_total_read_speed_bytes_per_second(readSpeedPerConsumer * pqConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition());
+ Result.set_partition_consumer_read_speed_bytes_per_second(readSpeedPerConsumer);
+ }
+
for (const auto &codec : config.GetCodecs().GetIds()) {
Result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 7b3ec66d9e5..79884fc97e4 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1992,32 +1992,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME);
}
-
- Y_UNIT_TEST(BigRead) {
- NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(24_MB));
- server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400, 20000000, "user", 2000000);
-
- server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });
-
- TString value(1_MB, 'x');
- for (ui32 i = 0; i < 32; ++i)
- server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
-
- // trying to read small PQ messages in a big gRPC event
- auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
- UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0);
- UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4);
-
- TInstant now(TInstant::Now());
- info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
- TDuration dur = TInstant::Now() - now;
- UNIT_ASSERT_C(dur > TDuration::Seconds(7) && dur < TDuration::Seconds(20), "dur = " << dur); //speed limit is 2000kb/s and burst is 2000kb, so to read 24mb it will take at least 11 seconds
-
- server.AnnoyingClient->GetPartStatus({}, 1, true);
-
- }
-
-
// expects that L2 size is 32Mb
Y_UNIT_TEST(Cache) {
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(18_MB));