diff options
author | savnik <savnik@yandex-team.com> | 2023-07-18 10:04:41 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-07-18 10:04:41 +0300 |
commit | 75d2969cb699ede22cb09d3a6adab8906fc0ff3c (patch) | |
tree | 3e2cff5eaf6298daf3e784acff31eef33147a2e2 | |
parent | ccbe76e66014692c9945391d3321d5bd491d022b (diff) | |
download | ydb-75d2969cb699ede22cb09d3a6adab8906fc0ff3c.tar.gz |
Add total partition read quota
31 files changed, 688 insertions, 311 deletions
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index 955cb5e244a..04e59e4d917 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -64,7 +64,8 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 02ed0e22e2f..549b7a7193c 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -65,7 +65,8 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index 02ed0e22e2f..549b7a7193c 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -65,7 +65,8 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index 955cb5e244a..04e59e4d917 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -64,7 +64,8 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_speed_limiter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_quoter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/sourceid.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/subscriber.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/transaction.cpp diff --git a/ydb/core/persqueue/read_speed_limiter.cpp b/ydb/core/persqueue/account_read_quoter.cpp index 0707426fe1b..f4f26eb74b6 100644 --- a/ydb/core/persqueue/read_speed_limiter.cpp +++ b/ydb/core/persqueue/account_read_quoter.cpp @@ -1,4 +1,4 @@ -#include "read_speed_limiter.h" +#include "account_read_quoter.h" #include "event_helpers.h" #include <ydb/core/base/appdata.h> @@ -18,15 +18,15 @@ namespace NPQ { const TDuration UPDATE_COUNTERS_INTERVAL = TDuration::Seconds(5); const TDuration DO_NOT_QUOTE_AFTER_ERROR_PERIOD = TDuration::Seconds(5); -const TString TReadSpeedLimiter::READ_QUOTA_ROOT_PATH = "read-quota"; +const TString TAccountReadQuoter::READ_QUOTA_ROOT_PATH = "read-quota"; -constexpr NKikimrServices::TActivity::EType TReadSpeedLimiter::ActorActivityType() { - return NKikimrServices::TActivity::PERSQUEUE_READ_SPEED_LIMITER; +constexpr NKikimrServices::TActivity::EType TAccountReadQuoter::ActorActivityType() { + return NKikimrServices::TActivity::PERSQUEUE_ACCOUNT_READ_QUOTER; } -TReadSpeedLimiter::TReadSpeedLimiter( +TAccountReadQuoter::TAccountReadQuoter( TActorId tabletActor, - TActorId partitionActor, + TActorId recepient, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition, @@ -34,7 +34,7 @@ TReadSpeedLimiter::TReadSpeedLimiter( const TTabletCountersBase& counters ) : TabletActor(tabletActor) - , PartitionActor(partitionActor) + , Recepient(recepient) , TabletId(tabletId) , TopicConverter(topicConverter) , Partition(partition) @@ -56,7 +56,7 @@ TReadSpeedLimiter::TReadSpeedLimiter( LimiterDescription() <<" kesus=" << KesusPath << " resource_path=" << QuotaResourcePath); } -void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) { +void TAccountReadQuoter::InitCounters(const TActorContext& ctx) { if (CountersInited) { return; } @@ -81,12 +81,12 @@ void TReadSpeedLimiter::InitCounters(const TActorContext& ctx) { } -void TReadSpeedLimiter::Bootstrap(const TActorContext& ctx) { +void TAccountReadQuoter::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateWork); ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); } -void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { +void TAccountReadQuoter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << " killed"); for (const auto& event : Queue) { auto cookie = event.Event->Get()->Cookie; @@ -99,12 +99,12 @@ void TReadSpeedLimiter::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContex Die(ctx); } -void TReadSpeedLimiter::HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr&, const TActorContext& ctx) { +void TAccountReadQuoter::HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr&, const TActorContext& ctx) { ctx.Schedule(UPDATE_COUNTERS_INTERVAL, new TEvPQ::TEvUpdateCounters); - ctx.Send(PartitionActor, new NReadSpeedLimiterEvents::TEvCounters(Counters, User)); + ctx.Send(Recepient, new NAccountReadQuoterEvents::TEvCounters(Counters, User)); } -void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx) { +void TAccountReadQuoter::HandleReadQuotaRequest(NAccountReadQuoterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << " quota required for cookie=" << ev->Get()->ReadRequest->Get()->Cookie ); @@ -117,7 +117,7 @@ void TReadSpeedLimiter::HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvReque } } -void TReadSpeedLimiter::HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx) { +void TAccountReadQuoter::HandleReadQuotaConsumed(NAccountReadQuoterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx) { ConsumedBytesInCredit += ev->Get()->ReadBytes; LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << "consumed read quota " << ev->Get()->ReadBytes @@ -147,7 +147,7 @@ void TReadSpeedLimiter::HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvCons } } -void TReadSpeedLimiter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) { +void TAccountReadQuoter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx) { QuotaRequestInFlight = false; const ui64 cookie = ev->Cookie; LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, @@ -170,21 +170,21 @@ void TReadSpeedLimiter::HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const } } -void TReadSpeedLimiter::TReadSpeedLimiter::ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx) { +void TAccountReadQuoter::TAccountReadQuoter::ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_SPEED_LIMITER, LimiterDescription() << " approve read for cookie=" << ev->Get()->Cookie ); InProcessReadRequestCookies.insert(ev->Get()->Cookie); auto waitTime = ctx.Now() - startWait; - Send(PartitionActor, new NReadSpeedLimiterEvents::TEvResponse(ev.Release(), waitTime)); + Send(Recepient, new NAccountReadQuoterEvents::TEvResponse(ev.Release(), waitTime)); if (QuotaWaitCounter) { QuotaWaitCounter->IncFor(waitTime.MilliSeconds()); } } -TString TReadSpeedLimiter::LimiterDescription() const { +TString TAccountReadQuoter::LimiterDescription() const { return TStringBuilder() << "topic=" << TopicConverter->GetClientsideName() << ":" << Partition << " user=" << User << ": "; } diff --git a/ydb/core/persqueue/read_speed_limiter.h b/ydb/core/persqueue/account_read_quoter.h index 624e7416a21..bfd223c5168 100644 --- a/ydb/core/persqueue/read_speed_limiter.h +++ b/ydb/core/persqueue/account_read_quoter.h @@ -11,8 +11,8 @@ namespace NPQ { class TPercentileCounter; -namespace NReadSpeedLimiterEvents { - struct TEvRequest : public TEventLocal<TEvRequest, TEvPQ::EvReadLimiterRequest> { +namespace NAccountReadQuoterEvents { + struct TEvRequest : public TEventLocal<TEvRequest, TEvPQ::EvAccountReadQuotaRequest> { TEvRequest(TEvPQ::TEvRead::TPtr readRequest) : ReadRequest(std::move(readRequest)) {} @@ -20,7 +20,7 @@ namespace NReadSpeedLimiterEvents { TEvPQ::TEvRead::TPtr ReadRequest; }; - struct TEvResponse : public TEventLocal<TEvResponse, TEvPQ::EvReadLimiterResponse> { + struct TEvResponse : public TEventLocal<TEvResponse, TEvPQ::EvAccountReadQuotaResponse> { TEvResponse(TEvPQ::TEvRead::TPtr readRequest, TDuration waitTime) : ReadRequest(std::move(readRequest)) , WaitTime(waitTime) @@ -30,7 +30,7 @@ namespace NReadSpeedLimiterEvents { TDuration WaitTime; }; - struct TEvConsumed : public TEventLocal<TEvConsumed, TEvPQ::EvReadLimiterConsumed> { + struct TEvConsumed : public TEventLocal<TEvConsumed, TEvPQ::EvAccountReadQuotaConsumed> { TEvConsumed(ui64 readBytes, ui64 readRequestCookie) : ReadBytes(readBytes) , ReadRequestCookie(readRequestCookie) @@ -40,7 +40,7 @@ namespace NReadSpeedLimiterEvents { ui64 ReadRequestCookie; }; - struct TEvCounters : public TEventLocal<TEvCounters, TEvPQ::EvReadLimiterCounters> { + struct TEvCounters : public TEventLocal<TEvCounters, TEvPQ::EvAccountReadQuotaCounters> { TEvCounters(const NKikimr::TTabletCountersBase& counters, const TString& user) : User(user) { @@ -52,7 +52,7 @@ namespace NReadSpeedLimiterEvents { }; } -class TReadSpeedLimiter : public TActorBootstrapped<TReadSpeedLimiter> { +class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> { private: static const TString READ_QUOTA_ROOT_PATH; @@ -72,8 +72,8 @@ private: TRACE_EVENT(NKikimrServices::PQ_READ_SPEED_LIMITER); switch (ev->GetTypeRewrite()) { HFuncTraced(TEvPQ::TEvUpdateCounters, HandleUpdateCounters); - HFuncTraced(NReadSpeedLimiterEvents::TEvRequest, HandleReadQuotaRequest); - HFuncTraced(NReadSpeedLimiterEvents::TEvConsumed, HandleReadQuotaConsumed); + HFuncTraced(NAccountReadQuoterEvents::TEvRequest, HandleReadQuotaRequest); + HFuncTraced(NAccountReadQuoterEvents::TEvConsumed, HandleReadQuotaConsumed); HFuncTraced(TEvQuota::TEvClearance, HandleClearance); HFuncTraced(TEvents::TEvPoisonPill, Handle); default: @@ -84,9 +84,9 @@ private: public: static constexpr NKikimrServices::TActivity::EType ActorActivityType(); - TReadSpeedLimiter( + TAccountReadQuoter( TActorId tabletActor, - TActorId partitionActor, + TActorId recepient, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition, @@ -98,8 +98,8 @@ public: void InitCounters(const TActorContext& ctx); void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); void HandleUpdateCounters(TEvPQ::TEvUpdateCounters::TPtr& ev, const TActorContext& ctx); - void HandleReadQuotaRequest(NReadSpeedLimiterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx); - void HandleReadQuotaConsumed(NReadSpeedLimiterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx); + void HandleReadQuotaRequest(NAccountReadQuoterEvents::TEvRequest::TPtr& ev, const TActorContext& ctx); + void HandleReadQuotaConsumed(NAccountReadQuoterEvents::TEvConsumed::TPtr& ev, const TActorContext& ctx); void HandleClearance(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); void ApproveRead(TEvPQ::TEvRead::TPtr ev, TInstant startWait, const TActorContext& ctx); @@ -109,7 +109,7 @@ private: private: const TActorId TabletActor; - const TActorId PartitionActor; + const TActorId Recepient; const ui64 TabletId; const NPersQueue::TTopicConverterPtr TopicConverter; const ui32 Partition; diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 56d77cc2d43..7c5e381ab29 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -107,10 +107,10 @@ struct TEvPQ { EvSplitMessageGroup, EvUpdateCounters, EvMirrorerCounters, - EvReadLimiterRequest, - EvReadLimiterResponse, - EvReadLimiterConsumed, - EvReadLimiterCounters, + EvAccountReadQuotaRequest, + EvAccountReadQuotaResponse, + EvAccountReadQuotaConsumed, + EvAccountReadQuotaCounters, EvRetryWrite, EvInitCredentials, EvCredentialsCreated, @@ -128,6 +128,12 @@ struct TEvPQ { EvPartitionConfigChanged, EvSubDomainStatus, EvStatsWakeup, + EvRequestQuota, + EvApproveQuota, + EvConsumed, + EvQuotaUpdated, + EvQuotaCountersUpdated, + EvConsumerRemoved, EvEnd }; @@ -794,6 +800,46 @@ struct TEvPQ { ui64 Round; }; + + struct TEvRequestQuota : public TEventLocal<TEvRequestQuota, EvRequestQuota> { + TEvRequestQuota(TEvPQ::TEvRead::TPtr readRequest) + : + ReadRequest(std::move(readRequest)) + {} + + TEvPQ::TEvRead::TPtr ReadRequest; + }; + + struct TEvApproveQuota : public TEventLocal<TEvApproveQuota, EvApproveQuota> { + TEvApproveQuota(TEvPQ::TEvRead::TPtr readRequest, TDuration waitTime) + : + ReadRequest(std::move(readRequest)), + WaitTime(std::move(waitTime)) + {} + + TEvPQ::TEvRead::TPtr ReadRequest; + TDuration WaitTime; + }; + + struct TEvConsumed : public TEventLocal<TEvConsumed, EvConsumed> { + TEvConsumed(ui64 readBytes, ui64 readRequestCookie, const TString& consumer) + : ReadBytes(readBytes), + ReadRequestCookie(readRequestCookie), + Consumer(consumer) + {} + + ui64 ReadBytes; + ui64 ReadRequestCookie; + TString Consumer; + }; + + struct TEvConsumerRemoved : public TEventLocal<TEvConsumerRemoved, EvConsumerRemoved> { + TEvConsumerRemoved(const TString& consumer) + : Consumer(consumer) + {} + + TString Consumer; + }; }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index faebc0aff8d..facb975b521 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -139,6 +139,7 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, co , DiskIsFull(false) , SubDomainOutOfSpace(subDomainOutOfSpace) , HasDataReqNum(0) + , AvgReadBytes(TDuration::Minutes(1), 1000) , AvgWriteBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}} , AvgQuotaBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, {TDuration::Hours(1), 2000}, {TDuration::Days(1), 2000}} , ReservedSize(0) @@ -194,7 +195,6 @@ ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const { return std::min<ui64>(MeteringDataSize(ctx), ReserveSize()); } - ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { const auto now = ctx.Now(); const auto duration = now - LastUsedStorageMeterTimestamp; @@ -408,6 +408,8 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) UsersInfoStorage->Clear(ctx); } + Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill()); + Die(ctx); } @@ -572,7 +574,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext for (ui32 i = 0; i < 4; ++i) { resSpeed[i] += userInfo.AvgReadBytes[i].GetValue(); } - maxQuota += userInfo.ReadQuota.GetTotalSpeed(); + maxQuota += userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get(); } if (ev->Get()->ClientId == userInfo.User) { //fill lags NKikimrPQ::TClientInfo* clientInfo = result.MutableLagsInfo(); @@ -895,7 +897,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); } ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - OnReadRequestFinished(std::move(info), answer.Size); + OnReadRequestFinished(cookie, answer.Size, info.User, ctx); } void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) { @@ -1056,12 +1058,6 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) { userInfo.LabeledCounters->GetCounters()[METRIC_USER_PARTITIONS].Set(1); } - ui64 speed = userInfo.ReadQuota.GetTotalSpeed(); - if (speed != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Get()) { - haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_BYTES].Set(speed); - } - ui64 readOffsetRewindSum = userInfo.ReadOffsetRewindSum; if (readOffsetRewindSum != userInfo.LabeledCounters->GetCounters()[METRIC_READ_OFFSET_REWIND_SUM].Get()) { haveChanges = true; @@ -1079,17 +1075,18 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) { id += 2; } Y_VERIFY(id == METRIC_MAX_READ_SPEED_4 + 1); - if (userInfo.ReadQuota.GetTotalSpeed()) { - ui64 quotaUsage = ui64(userInfo.AvgReadBytes[1].GetValue()) * 1000000 / userInfo.ReadQuota.GetTotalSpeed() / 60; - if (quotaUsage != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_USAGE].Get()) { + if (userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get()) { + ui64 quotaUsage = ui64(userInfo.AvgReadBytes[1].GetValue()) * 1000000 / userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get() / 60; + if (quotaUsage != userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get()) { haveChanges = true; - userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_USAGE].Set(quotaUsage); + userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage); } } if (haveChanges) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters)); } } + bool haveChanges = false; if (SourceIdStorage.GetInMemorySourceIds().size() != PartitionCountersLabeled->GetCounters()[METRIC_MAX_NUM_SIDS].Get()) { haveChanges = true; @@ -1189,6 +1186,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) { haveChanges = true; PartitionCountersLabeled->GetCounters()[METRIC_WRITE_TIME_LAG_MS].Set(timeLag); } + + if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get()) { + ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60; + if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) { + haveChanges = true; + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage); + } + } return haveChanges; } @@ -1198,6 +1203,16 @@ void TPartition::ReportCounters(const TActorContext& ctx) { } } +void TPartition::Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext&) { + for (auto& [consumerStr, quota] : ev->Get()->UpdatedConsumerQuotas) { + TUserInfo* userInfo = UsersInfoStorage->GetIfExists(consumerStr); + if (userInfo) { + userInfo->LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Set(quota); + } + } + PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Set(ev->Get()->UpdatedTotalPartitionQuota); +} + void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) { auto& response = ev->Get()->Record; @@ -1599,17 +1614,9 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf UsersInfoStorage->UpdateConfig(Config); - WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { - for (auto& userInfo : UsersInfoStorage->GetAll()) { - userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2); - } - } + WriteQuota->UpdateConfigIfChanged(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); - for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto& userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); - userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); - } + Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); if (Config.GetPartitionConfig().HasMirrorFrom()) { if (Mirrorer) { diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 5784df51e90..612bb8631c3 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -10,6 +10,7 @@ #include "subscriber.h" #include "user_info.h" #include "utils.h" +#include "read_quoter.h" #include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h> @@ -108,8 +109,9 @@ private: void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); void FilterDeadlinedWrites(const TActorContext& ctx); - void Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx); - void Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx); + void Handle(NReadQuoterEvents::TEvQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx); + void Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvApproveQuota::TPtr& ev, const TActorContext& ctx); void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx); @@ -162,7 +164,7 @@ private: void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx); void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx); - void OnReadRequestFinished(TReadInfo&& info, ui64 answerSize); + void OnReadRequestFinished(ui64 cookie, ui64 answerSize, const TString& consumer, const TActorContext& ctx); void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx); void ProcessChangeOwnerRequests(const TActorContext& ctx); @@ -386,13 +388,14 @@ private: HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle); HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle); HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle); - HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle); HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle); HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit); HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit); HFuncTraced(TEvPQ::TEvTxCommit, HandleOnInit); HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); + HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); + HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); default: if (!Initializer.Handle(ev)) { ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); @@ -414,7 +417,7 @@ private: HFuncTraced(TEvPQ::TEvBlobResponse, Handle); HFuncTraced(TEvPQ::TEvWrite, HandleOnIdle); HFuncTraced(TEvPQ::TEvRead, Handle); - HFuncTraced(NReadSpeedLimiterEvents::TEvResponse, Handle); + HFuncTraced(TEvPQ::TEvApproveQuota, Handle); HFuncTraced(TEvPQ::TEvReadTimeout, Handle); HFuncTraced(TEvents::TEvPoisonPill, Handle); HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring); @@ -429,7 +432,6 @@ private: HFuncTraced(TEvPQ::TEvChangeOwner, Handle); HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle); HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle); - HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle); HFuncTraced(TEvPQ::TEvProxyResponse, Handle); HFuncTraced(TEvPQ::TEvError, Handle); HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle); @@ -447,7 +449,8 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); - + HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); + HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); default: ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev)); break; @@ -468,7 +471,7 @@ private: HFuncTraced(TEvPQ::TEvBlobResponse, Handle); HFuncTraced(TEvPQ::TEvWrite, HandleOnWrite); HFuncTraced(TEvPQ::TEvRead, Handle); - HFuncTraced(NReadSpeedLimiterEvents::TEvResponse, Handle); + HFuncTraced(TEvPQ::TEvApproveQuota, Handle); HFuncTraced(TEvPQ::TEvReadTimeout, Handle); HFuncTraced(TEvents::TEvPoisonPill, Handle); HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring); @@ -483,7 +486,6 @@ private: HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle); HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle); HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle); - HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle); HFuncTraced(TEvPQ::TEvProxyResponse, Handle); HFuncTraced(TEvPQ::TEvError, Handle); HFuncTraced(TEvPQ::TEvReserveBytes, Handle); @@ -501,7 +503,8 @@ private: HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); - + HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); + HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle); default: ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev)); break; @@ -626,7 +629,10 @@ private: TSet<THasDataDeadline> HasDataDeadlines; ui64 HasDataReqNum; + NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>> AvgReadBytes; + TMaybe<TQuotaTracker> WriteQuota; + TActorId ReadQuotaTrackerActor; THolder<TPercentileCounter> PartitionWriteQuotaWaitCounter; TInstant QuotaDeadline = TInstant::Zero(); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index d47b026edad..56cdd629e37 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -663,6 +663,16 @@ void TPartition::Initialize(const TActorContext& ctx) { WriteQuota.ConstructInPlace(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), ctx.Now()); + ReadQuotaTrackerActor = Register(new TReadQuoter( + ctx, + SelfId(), + TopicConverter, + Config, + Partition, + Tablet, + TabletID, + Counters + )); WriteTimestamp = ctx.Now(); LastUsedStorageMeterTimestamp = ctx.Now(); WriteTimestampEstimate = ManageWriteTimestampEstimate ? ctx.Now() : TInstant::Zero(); @@ -681,10 +691,8 @@ void TPartition::Initialize(const TActorContext& ctx) { TopicWriteQuotaResourcePath); UsersInfoStorage.ConstructInPlace(DCId, - TabletID, TopicConverter, Partition, - Counters, Config, CloudId, DbId, @@ -730,11 +738,6 @@ void TPartition::Initialize(const TActorContext& ctx) { DataKeysHead.push_back(TKeyLevel(CompactLevelBorder[i])); } - for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto &userInfo = UsersInfoStorage->GetOrCreate(readQuota.GetClientId(), ctx); - userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); - } - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "bootstrapping " << Partition << " " << ctx.SelfID); if (AppData(ctx)->Counters) { diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 6b7e14c0327..7a30acc2ec9 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -128,20 +128,6 @@ void TPartition::UpdateAvailableSize(const TActorContext& ctx) { auto now = ctx.Now(); WriteQuota->Update(now); - for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { - while (true) { - if (!userInfo.ReadQuota.CanExaust(now) && !userInfo.ReadRequests.empty()) { - break; - } - if (!userInfo.ReadRequests.empty()) { - auto ri(std::move(userInfo.ReadRequests.front().first)); - auto cookie = userInfo.ReadRequests.front().second; - userInfo.ReadRequests.pop_front(); - ProcessRead(ctx, std::move(ri), cookie, false); - } else - break; - } - } ScheduleUpdateAvailableSize(ctx); } @@ -176,13 +162,8 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont } } -void TPartition::Handle(NReadSpeedLimiterEvents::TEvCounters::TPtr& ev, const TActorContext& /*ctx*/) { - auto userInfo = UsersInfoStorage->GetIfExists(ev->Get()->User); - if (userInfo && userInfo->ReadSpeedLimiter) { - auto diff = ev->Get()->Counters.MakeDiffForAggr(userInfo->ReadSpeedLimiter->Baseline); - TabletCounters.Populate(*diff.Get()); - ev->Get()->Counters.RememberCurrentStateAsBaseline(userInfo->ReadSpeedLimiter->Baseline); - } +void TPartition::Handle(NReadQuoterEvents::TEvQuotaCountersUpdated::TPtr& ev, const TActorContext& /*ctx*/) { + TabletCounters.Populate(*ev->Get()->Counters.Get()); } void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { @@ -513,7 +494,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct auto& userInfo = UsersInfoStorage->GetOrCreate(res->User, ctx); userInfo.ForgetSubscription(ctx.Now()); - OnReadRequestFinished(std::move(res.GetRef()), answer.Size); + OnReadRequestFinished(res->Destination, answer.Size, res->User, ctx); } @@ -678,30 +659,31 @@ void TPartition::Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx) { return; } } - - if (userInfo.ReadSpeedLimiter) { - Send(userInfo.ReadSpeedLimiter->Actor, new NReadSpeedLimiterEvents::TEvRequest(ev.Release())); - } else { - DoRead(ev.Release(), TDuration::Zero(), ctx); - } + userInfo.ReadsInQuotaQueue++; + Send(ReadQuotaTrackerActor, new TEvPQ::TEvRequestQuota(ev.Release())); } -void TPartition::Handle(NReadSpeedLimiterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx) { +void TPartition::Handle(TEvPQ::TEvApproveQuota::TPtr& ev, const TActorContext& ctx) { DoRead(ev->Get()->ReadRequest.Release(), ev->Get()->WaitTime, ctx); } void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx) { auto read = ev->Get(); const TString& user = read->ClientId; - auto& userInfo = UsersInfoStorage->GetOrCreate(user, ctx); - + auto userInfo = UsersInfoStorage->GetIfExists(user); + if(!userInfo) { + ReplyError(ctx, read->Cookie, NPersQueue::NErrorCode::BAD_REQUEST, TStringBuilder() << "cannot finish read request. Consumer " << read->ClientId << " is gone from partition"); + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); + return; + } + userInfo->ReadsInQuotaQueue--; ui64 offset = read->Offset; - if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo.ReadFromTimestamp > TInstant::MilliSeconds(1))) { + if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) { TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero(); timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs)); - timestamp = Max(timestamp, userInfo.ReadFromTimestamp); + timestamp = Max(timestamp, userInfo->ReadFromTimestamp); offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset); - userInfo.ReadOffsetRewindSum += offset - read->Offset; + userInfo->ReadOffsetRewindSum += offset - read->Offset; } TReadInfo info(user, read->ClientDC, offset, read->PartNo, read->Count, read->Size, read->Cookie, read->ReadTimestampMs, waitQuotaTime); @@ -729,8 +711,8 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const read->Timeout = 30000; } Subscriber.AddSubscription(std::move(info), read->Timeout, cookie, ctx); - ++userInfo.Subscriptions; - userInfo.UpdateReadOffset((i64)offset - 1, userInfo.WriteTimestamp, userInfo.CreateTimestamp, ctx.Now()); + ++userInfo->Subscriptions; + userInfo->UpdateReadOffset((i64)offset - 1, userInfo->WriteTimestamp, userInfo->CreateTimestamp, ctx.Now()); return; } @@ -740,20 +722,9 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const ProcessRead(ctx, std::move(info), cookie, false); } -void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { - auto userInfo = UsersInfoStorage->GetIfExists(info.User); - Y_VERIFY(userInfo); - - if (Config.GetMeteringMode() != NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY) { - return; - } - - if (userInfo->ReadSpeedLimiter) { - Send( - userInfo->ReadSpeedLimiter->Actor, - new NReadSpeedLimiterEvents::TEvConsumed(answerSize, info.Destination) - ); - } +void TPartition::OnReadRequestFinished(ui64 cookie, ui64 answerSize, const TString& consumer, const TActorContext& ctx) { + AvgReadBytes.Update(answerSize, ctx.Now()); + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumed(answerSize, cookie, consumer)); } void TPartition::ReadTimestampForOffset(const TString& user, TUserInfo& userInfo, const TActorContext& ctx) { @@ -954,6 +925,7 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) } UsersInfoStorage->Remove(user, ctx); + Send(ReadQuotaTrackerActor, new TEvPQ::TEvConsumerRemoved(user)); } } @@ -989,11 +961,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u userInfo.ForgetSubscription(ctx.Now()); } - if (!userInfo.ReadQuota.CanExaust(ctx.Now())) { - userInfo.ReadRequests.push_back({std::move(info), cookie}); - userInfo.UpdateReadingTimeAndState(ctx.Now()); - return; - } TVector<TRequestedBlob> blobs = GetReadRequestFromBody(info.Offset, info.PartNo, info.Count, info.Size, &count, &size); info.Blobs = blobs; ui64 lastOffset = info.Offset + Min(count, info.Count); @@ -1025,7 +992,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds()); TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp.ByteSize()); ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release()); - OnReadRequestFinished(std::move(info), answer.Size); + OnReadRequestFinished(cookie, answer.Size, info.User, ctx); return; } diff --git a/ydb/core/persqueue/quota_tracker.cpp b/ydb/core/persqueue/quota_tracker.cpp index 9d8d0722cf0..11dd188424a 100644 --- a/ydb/core/persqueue/quota_tracker.cpp +++ b/ydb/core/persqueue/quota_tracker.cpp @@ -9,10 +9,14 @@ namespace NKikimr::NPQ { , MaxBurst(maxBurst)
{}
- void TQuotaTracker::UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
- SpeedPerSecond = speedPerSecond;
- MaxBurst = maxBurst;
- AvailableSize = maxBurst;
+ bool TQuotaTracker::UpdateConfigIfChanged(const ui64 maxBurst, const ui64 speedPerSecond) {
+ if (maxBurst != MaxBurst || speedPerSecond != SpeedPerSecond) {
+ SpeedPerSecond = speedPerSecond;
+ MaxBurst = maxBurst;
+ AvailableSize = maxBurst;
+ return true;
+ }
+ return false;
}
void TQuotaTracker::Update(const TInstant timestamp) {
diff --git a/ydb/core/persqueue/quota_tracker.h b/ydb/core/persqueue/quota_tracker.h index 82dcfba9ecc..f5a004b2ccc 100644 --- a/ydb/core/persqueue/quota_tracker.h +++ b/ydb/core/persqueue/quota_tracker.h @@ -8,7 +8,7 @@ namespace NKikimr::NPQ { public:
TQuotaTracker(const ui64 maxBurst, const ui64 speedPerSecond, const TInstant timestamp);
- void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond);
+ bool UpdateConfigIfChanged(const ui64 maxBurst, const ui64 speedPerSecond);
void Update(const TInstant timestamp);
bool CanExaust(const TInstant timestamp) ;
diff --git a/ydb/core/persqueue/read_quoter.cpp b/ydb/core/persqueue/read_quoter.cpp new file mode 100644 index 00000000000..2ec3c939759 --- /dev/null +++ b/ydb/core/persqueue/read_quoter.cpp @@ -0,0 +1,224 @@ +#include "read_quoter.h" +#include "account_read_quoter.h" + + +namespace NKikimr { +namespace NPQ { + +const TDuration WAKE_UP_TIMEOUT = TDuration::Seconds(5); +const ui64 DEFAULT_READ_SPEED_AND_BURST = 1'000'000'000; + +void TReadQuoter::Bootstrap(const TActorContext &ctx) { + Become(&TThis::StateWork); + ScheduleWakeUp(ctx); + + //UpdateConsumersWithCustomQuota(ctx); // depricated. Delete this after 01.10.2023 +} + +void TReadQuoter::HandleQuotaRequest(TEvPQ::TEvRequestQuota::TPtr& ev, const TActorContext& ctx) { + TConsumerReadQuota* consumer = GetOrCreateConsumerQuota(ev->Get()->ReadRequest->Get()->ClientId, ctx); + QuotaRequestedTimes.emplace(ev->Get()->ReadRequest->Cookie, ctx.Now()); + + if (consumer->ReadSpeedLimiter) { + Send(consumer->ReadSpeedLimiter->Actor, new NAccountReadQuoterEvents::TEvRequest(ev->Get()->ReadRequest.Release())); + } else { + CheckConsumerPerPartitionQuota(ev->Get()->ReadRequest, ctx); + } +} + +void TReadQuoter::HandleAccountQuotaApproved(NAccountReadQuoterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx) { + CheckConsumerPerPartitionQuota(ev->Get()->ReadRequest, ctx); +} + +void TReadQuoter::CheckConsumerPerPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) { + TConsumerReadQuota* consumerQuota = GetOrCreateConsumerQuota(ev->Get()->ClientId, ctx); + if (!consumerQuota->PartitionPerConsumerQuotaTracker.CanExaust(ctx.Now())) { + consumerQuota->ReadRequests.push_back(ev); + return; + } + CheckTotalPartitionQuota(ev, ctx); +} + +void TReadQuoter::CheckTotalPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) { + if (!PartitionTotalQuotaTracker.CanExaust(ctx.Now())) { + ReadRequests.push_back(ev); + return; + } + ApproveQuota(ev, ctx); +} + +void TReadQuoter::ApproveQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx) { + auto waitTime = TDuration::Zero(); + auto waitTimeIter = QuotaRequestedTimes.find(ev->Get()->Cookie); + if (waitTimeIter != QuotaRequestedTimes.end()) { + waitTime = ctx.Now() - waitTimeIter->second; + QuotaRequestedTimes.erase(waitTimeIter); + } + Send(PartitionActor, new TEvPQ::TEvApproveQuota(ev, waitTime)); +} + +void TReadQuoter::HandleWakeUp(TEvents::TEvWakeup::TPtr&, const TActorContext& ctx) { + for (auto& [consumerStr, consumer] : ConsumerQuotas) { + while (!consumer.ReadRequests.empty() && consumer.PartitionPerConsumerQuotaTracker.CanExaust(ctx.Now())) { + auto readEvent(std::move(consumer.ReadRequests.front())); + consumer.ReadRequests.pop_front(); + CheckTotalPartitionQuota(readEvent, ctx); + } + } + while (!ReadRequests.empty() && PartitionTotalQuotaTracker.CanExaust(ctx.Now())) { + auto readEvent(std::move(ReadRequests.front())); + ReadRequests.pop_front(); + ApproveQuota(readEvent, ctx); + } + ScheduleWakeUp(ctx); +} + +void TReadQuoter::HandleConsumerRemoved(TEvPQ::TEvConsumerRemoved::TPtr& ev, const TActorContext&) { + auto it = ConsumerQuotas.find(ev->Get()->Consumer); + if(it != ConsumerQuotas.end()) { + if(it->second.ReadSpeedLimiter) { + Send(it->second.ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill()); + } + ConsumerQuotas.erase(it); + } +} + +void TReadQuoter::HandleConsumed(TEvPQ::TEvConsumed::TPtr& ev,const TActorContext& ctx) { + PartitionTotalQuotaTracker.Exaust(ev->Get()->ReadBytes, ctx.Now()); + auto consumerQuota = GetIfExists(ev->Get()->Consumer); + if(!consumerQuota) + return; + if (consumerQuota->ReadSpeedLimiter) { + Send( + consumerQuota->ReadSpeedLimiter->Actor, + new NAccountReadQuoterEvents::TEvConsumed(ev->Get()->ReadBytes, ev->Get()->ReadRequestCookie) + ); + } + consumerQuota->PartitionPerConsumerQuotaTracker.Exaust(ev->Get()->ReadBytes, ctx.Now()); +} + +void TReadQuoter::HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) { + PQTabletConfig = ev->Get()->Config; + UpdateQuota(ctx); + //UpdateConsumersWithCustomQuota(ctx); // depricated. Delete this after 01.10.2023 +} + +void TReadQuoter::HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEvCounters::TPtr& ev, const TActorContext&) { + auto consumerQuota = GetIfExists(ev->Get()->User); + if(!consumerQuota) + return; + if (consumerQuota->ReadSpeedLimiter) { + auto diff = ev->Get()->Counters.MakeDiffForAggr(consumerQuota->ReadSpeedLimiter->Baseline); + ev->Get()->Counters.RememberCurrentStateAsBaseline(consumerQuota->ReadSpeedLimiter->Baseline); + Send(PartitionActor, new NReadQuoterEvents::TEvQuotaCountersUpdated(diff)); + } +} + +void TReadQuoter::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) { + for (auto& consumerQuota : ConsumerQuotas) { + if(consumerQuota.second.ReadSpeedLimiter) { + Send(consumerQuota.second.ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill()); + } + } + ConsumerQuotas.clear(); + Die(ctx); +} + +void TReadQuoter::UpdateQuota(const TActorContext &ctx) { + TVector<std::pair<TString, ui64>> updatedQuotas; + for (auto& [consumerStr, consumerQuota] : ConsumerQuotas) { + if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged( + GetConsumerReadBurst(ctx), + GetConsumerReadSpeed(ctx))) { + updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}); + } + } + auto totalQuotaUpdated = PartitionTotalQuotaTracker.UpdateConfigIfChanged(GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx)); + if (updatedQuotas.size() || totalQuotaUpdated) { + Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated(updatedQuotas, PartitionTotalQuotaTracker.GetTotalSpeed())); + } +} + +void TReadQuoter::UpdateConsumersWithCustomQuota(const TActorContext &ctx) { + TVector<std::pair<TString, ui64>> updatedQuotas; + for (const auto& readQuota : PQTabletConfig.GetPartitionConfig().GetReadQuota()) { + auto consumerQuota = GetOrCreateConsumerQuota(readQuota.GetClientId(), ctx); + if (consumerQuota->PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond())) { + updatedQuotas.push_back({readQuota.GetClientId(), consumerQuota->PartitionPerConsumerQuotaTracker.GetTotalSpeed()}); + } + } + if (updatedQuotas.size()) { + Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated(updatedQuotas, PartitionTotalQuotaTracker.GetTotalSpeed())); + } +} + +ui64 TReadQuoter::GetConsumerReadSpeed(const TActorContext& ctx) const { + return AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota() ? + PQTabletConfig.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2 + : DEFAULT_READ_SPEED_AND_BURST; +} + +ui64 TReadQuoter::GetConsumerReadBurst(const TActorContext& ctx) const { + return AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota() ? + PQTabletConfig.GetPartitionConfig().GetBurstSize() * 2 + : DEFAULT_READ_SPEED_AND_BURST; +} + +ui64 TReadQuoter::GetTotalPartitionReadSpeed(const TActorContext& ctx) const { + auto consumersPerPartition = AppData(ctx)->PQConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition(); + return GetConsumerReadSpeed(ctx) * consumersPerPartition; +} + +ui64 TReadQuoter::GetTotalPartitionReadBurst(const TActorContext& ctx) const { + auto consumersPerPartition = AppData(ctx)->PQConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition(); + return GetConsumerReadBurst(ctx) * consumersPerPartition; +} + +TConsumerReadQuota* TReadQuoter::GetOrCreateConsumerQuota(const TString& consumerStr, const TActorContext& ctx) { + Y_VERIFY(!consumerStr.empty()); + auto it = ConsumerQuotas.find(consumerStr); + if (it == ConsumerQuotas.end()) { + TConsumerReadQuota consumer(CreateReadSpeedLimiter(consumerStr), GetConsumerReadBurst(ctx), GetConsumerReadSpeed(ctx)); + auto result = ConsumerQuotas.emplace(consumerStr, std::move(consumer)); + Y_VERIFY(result.second); + Send(PartitionActor, new NReadQuoterEvents::TEvQuotaUpdated({{consumerStr, consumer.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}}, PartitionTotalQuotaTracker.GetTotalSpeed())); + return &result.first->second; + } + return &it->second; +} + +TConsumerReadQuota* TReadQuoter::GetIfExists(const TString& consumerStr) { + auto it = ConsumerQuotas.find(consumerStr); + return it != ConsumerQuotas.end() ? &it->second : nullptr; +} + +void TReadQuoter::ScheduleWakeUp(const TActorContext& ctx) { + ctx.Schedule(WAKE_UP_TIMEOUT, new TEvents::TEvWakeup()); +} + +THolder<TAccountReadQuoterHolder> TReadQuoter::CreateReadSpeedLimiter(const TString& user) const { + const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig(); + if (TabletActor && quotingConfig.GetEnableQuoting() && quotingConfig.GetEnableReadQuoting()) { + TActorId actorId = TActivationContext::Register( + new TAccountReadQuoter( + TabletActor, + SelfId(), + TabletId, + TopicConverter, + Partition, + user, + Counters + ), + PartitionActor + ); + return MakeHolder<TAccountReadQuoterHolder>(actorId, Counters); + } + return nullptr; +} + +TQuotaTracker TReadQuoter::CreatePartitionTotalQuotaTracker(const TActorContext& ctx) const { + return {GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx), ctx.Now()}; +} + +}// NPQ +}// NKikimr diff --git a/ydb/core/persqueue/read_quoter.h b/ydb/core/persqueue/read_quoter.h new file mode 100644 index 00000000000..6b87bc0605a --- /dev/null +++ b/ydb/core/persqueue/read_quoter.h @@ -0,0 +1,150 @@ +#pragma once + +#include "quota_tracker.h" +#include "account_read_quoter.h" +#include "user_info.h" + +#include <ydb/core/quoter/public/quoter.h> +#include <ydb/core/persqueue/events/internal.h> + +#include <library/cpp/actors/core/hfunc.h> + + +namespace NKikimr { +namespace NPQ { + +namespace NReadQuoterEvents { + +struct TEvQuotaUpdated : public TEventLocal<TEvQuotaUpdated, TEvPQ::EvQuotaUpdated> { + TEvQuotaUpdated(TVector<std::pair<TString, ui64>> updatedConsumerQuotas, ui64 updatedTotalPartitionQuota) + : UpdatedConsumerQuotas(std::move(updatedConsumerQuotas)), + UpdatedTotalPartitionQuota(updatedTotalPartitionQuota) + {} + + TVector<std::pair<TString, ui64>> UpdatedConsumerQuotas; + ui64 UpdatedTotalPartitionQuota; +}; + +struct TEvQuotaCountersUpdated : public TEventLocal<TEvQuotaCountersUpdated, TEvPQ::EvQuotaCountersUpdated> { + TEvQuotaCountersUpdated(TAutoPtr<TTabletCountersBase> counters) + : Counters(std::move(counters)) + {} + + TAutoPtr<TTabletCountersBase> Counters; +}; + +}// NReadQuoterEvents + +struct TAccountReadQuoterHolder { + TAccountReadQuoterHolder(const TActorId& actor, const TTabletCountersBase& baseline) + : Actor(actor) + { + Baseline.Populate(baseline); + } + + TActorId Actor; + TTabletCountersBase Baseline; +}; + +class TConsumerReadQuota { + public: + TConsumerReadQuota(THolder<TAccountReadQuoterHolder> readSpeedLimiter, ui64 readQuotaBurst, ui64 readQuotaSpeed): + PartitionPerConsumerQuotaTracker(readQuotaBurst, readQuotaSpeed, TAppData::TimeProvider->Now()), + ReadSpeedLimiter(std::move(readSpeedLimiter)) + { } + public: + TQuotaTracker PartitionPerConsumerQuotaTracker; + THolder<TAccountReadQuoterHolder> ReadSpeedLimiter; + std::deque<TEvPQ::TEvRead::TPtr> ReadRequests; +}; + +class TReadQuoter : public TActorBootstrapped<TReadQuoter> { +public: + TReadQuoter( + const TActorContext& ctx, + TActorId partitionActor, + const NPersQueue::TTopicConverterPtr& topicConverter, + const NKikimrPQ::TPQTabletConfig& config, + ui32 partition, + TActorId tabletActor, + ui64 tabletId, + const TTabletCountersBase& counters + ): + TabletActor(tabletActor), + PartitionActor(partitionActor), + PQTabletConfig(config), + PartitionTotalQuotaTracker(CreatePartitionTotalQuotaTracker(ctx)), + TopicConverter(topicConverter), + Partition(partition), + TabletId(tabletId) + { + Counters.Populate(counters); + } + +private: + STFUNC(StateWork) + { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPQ::TEvRequestQuota, HandleQuotaRequest); + HFunc(TEvents::TEvWakeup, HandleWakeUp); + HFunc(NAccountReadQuoterEvents::TEvResponse, HandleAccountQuotaApproved); + HFunc(TEvPQ::TEvConsumed, HandleConsumed); + HFunc(TEvPQ::TEvChangePartitionConfig, HandleConfigUpdate); + HFunc(NAccountReadQuoterEvents::TEvCounters, HandleUpdateAccountQuotaCounters); + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvPQ::TEvConsumerRemoved, HandleConsumerRemoved); + default: + break; + }; + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::PERSQUEUE_READ_QUOTER; + } + + void Bootstrap(const TActorContext &ctx); + + void HandleQuotaRequest(TEvPQ::TEvRequestQuota::TPtr& ev,const TActorContext& ctx); + void HandleAccountQuotaApproved(NAccountReadQuoterEvents::TEvResponse::TPtr& ev, const TActorContext& ctx); + void HandleWakeUp(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + void HandleConsumed(TEvPQ::TEvConsumed::TPtr& ev, const TActorContext& ctx); + void HandlePoisonPill(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); + void HandleConfigUpdate(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx); + void HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEvCounters::TPtr& ev, const TActorContext& ctx); + void HandleConsumerRemoved(TEvPQ::TEvConsumerRemoved::TPtr& ev, const TActorContext& ctx); + +private: + TConsumerReadQuota* GetOrCreateConsumerQuota(const TString& consumerStr, const TActorContext& ctx); + THolder<TAccountReadQuoterHolder> CreateReadSpeedLimiter(const TString& user) const; + TQuotaTracker CreatePartitionTotalQuotaTracker(const TActorContext& ctx) const; + void CheckConsumerPerPartitionQuota(TEvPQ::TEvRead::TPtr, const TActorContext& ctx); + void CheckTotalPartitionQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx); + void ApproveQuota(TEvPQ::TEvRead::TPtr ev, const TActorContext& ctx); + void ScheduleWakeUp(const TActorContext& ctx); + void UpdateConsumersWithCustomQuota(const TActorContext &ctx); + void UpdateQuota(const TActorContext &ctx); + void UpdateCounters(const TVector<std::pair<TString, ui64>>& updatedConsumerQuotas); + TConsumerReadQuota* GetIfExists(const TString& consumerStr); + ui64 GetConsumerReadSpeed(const TActorContext& ctx) const; + ui64 GetConsumerReadBurst(const TActorContext& ctx) const; + ui64 GetTotalPartitionReadSpeed(const TActorContext& ctx) const; + ui64 GetTotalPartitionReadBurst(const TActorContext& ctx) const; + +private: + TActorId TabletActor; + TActorId PartitionActor; + THashMap<TString, TConsumerReadQuota> ConsumerQuotas; + THashMap<ui64, TInstant> QuotaRequestedTimes; + std::deque<TEvPQ::TEvRead::TPtr> ReadRequests; + NKikimrPQ::TPQTabletConfig PQTabletConfig; + TQuotaTracker PartitionTotalQuotaTracker; + NPersQueue::TTopicConverterPtr TopicConverter; + const ui32 Partition; + ui64 TabletId; + TTabletCountersBase Counters; +}; + + +}// NPQ +}// NKikimr diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index b64c92609f7..317a761f20d 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -39,10 +39,8 @@ namespace NDeprecatedUserData { TUsersInfoStorage::TUsersInfoStorage( TString dcId, - ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition, - const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config, const TString& cloudId, const TString& dbId, @@ -51,7 +49,6 @@ TUsersInfoStorage::TUsersInfoStorage( const TString& folderId ) : DCId(std::move(dcId)) - , TabletId(tabletId) , TopicConverter(topicConverter) , Partition(partition) , Config(config) @@ -62,7 +59,6 @@ TUsersInfoStorage::TUsersInfoStorage( , FolderId(folderId) , CurReadRuleGeneration(0) { - Counters.Populate(counters); } void TUsersInfoStorage::Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx) { @@ -141,10 +137,9 @@ void TUsersInfoStorage::Parse(const TString& key, const TString& data, const TAc userInfo->Parsed = true; } -void TUsersInfoStorage::Remove(const TString& user, const TActorContext& ctx) { +void TUsersInfoStorage::Remove(const TString& user, const TActorContext&) { auto it = UsersInfo.find(user); Y_VERIFY(it != UsersInfo.end()); - it->second.Clear(ctx); UsersInfo.erase(it); } @@ -174,12 +169,6 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, ui32 gen, ui32 step, i64 offset, ui64 readOffsetRewindSum, TInstant readFromTimestamp) const { - ui64 burst = 1'000'000'000, speed = 1'000'000'000; - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { - burst = Config.GetPartitionConfig().GetBurstSize() * 2; - speed = Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2; - } - TString defaultServiceType = AppData(ctx)->PQConfig.GetDefaultClientServiceType().GetName(); TString userServiceType = ""; for (ui32 i = 0; i < Config.ReadRulesSize(); ++i) { @@ -193,9 +182,10 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, return { - ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition, + ctx, StreamCountersSubgroup, + user, readRuleGeneration, important, TopicConverter, Partition, session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath, - meterRead, burst, speed + meterRead }; } @@ -216,38 +206,9 @@ TUserInfo& TUsersInfoStorage::Create( return result.first->second; } -void TUsersInfoStorage::Clear(const TActorContext& ctx) { - for (auto& userInfoPair : UsersInfo) { - userInfoPair.second.Clear(ctx); - } +void TUsersInfoStorage::Clear(const TActorContext&) { UsersInfo.clear(); } -void TUserInfo::Clear(const TActorContext& ctx) { - if (ReadSpeedLimiter) { - ctx.Send(ReadSpeedLimiter->Actor, new TEvents::TEvPoisonPill()); - } -} - -THolder<TReadSpeedLimiterHolder> TUsersInfoStorage::CreateReadSpeedLimiter(const TString& user) const { - const auto& quotingConfig = AppData()->PQConfig.GetQuotingConfig(); - if (TabletActor && quotingConfig.GetEnableQuoting() && quotingConfig.GetEnableReadQuoting()) { - TActorId actorId = TActivationContext::Register( - new TReadSpeedLimiter( - TabletActor.GetRef(), - PartitionActor.GetRef(), - TabletId, - TopicConverter, - Partition, - user, - Counters - ), - PartitionActor.GetRef() - ); - return MakeHolder<TReadSpeedLimiterHolder>(actorId, Counters); - } - return nullptr; -} - } //NPQ } //NKikimr diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 8e5faf9d0e8..d76b4765013 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -4,7 +4,7 @@ #include "subscriber.h" #include "percentile_counter.h" #include "quota_tracker.h" -#include "read_speed_limiter.h" +#include "account_read_quoter.h" #include "metering_sink.h" #include <ydb/core/base/counters.h> @@ -36,17 +36,6 @@ static const TString CLIENTID_WITHOUT_CONSUMER = "$without_consumer"; typedef TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor> TUserLabeledCounters; -struct TReadSpeedLimiterHolder { - TReadSpeedLimiterHolder(const TActorId& actor, const TTabletCountersBase& baseline) - : Actor(actor) - { - Baseline.Populate(baseline); - } - - TActorId Actor; - TTabletCountersBase Baseline; -}; - struct TUserInfoBase { TString User; ui64 ReadRuleGeneration = 0; @@ -61,8 +50,6 @@ struct TUserInfoBase { }; struct TUserInfo: public TUserInfoBase { - THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter; - TInstant WriteTimestamp; TInstant CreateTimestamp; TInstant ReadTimestamp; @@ -83,16 +70,13 @@ struct TUserInfo: public TUserInfoBase { THolder<TUserLabeledCounters> LabeledCounters; NPersQueue::TTopicConverterPtr TopicConverter; - std::deque<std::pair<TReadInfo, ui64>> ReadRequests; - - TQuotaTracker ReadQuota; - TWorkingTimeCounter Counter; NKikimr::NPQ::TMultiCounter BytesRead; NKikimr::NPQ::TMultiCounter MsgsRead; TMap<TString, NKikimr::NPQ::TMultiCounter> BytesReadFromDC; ui32 ActiveReads; + ui32 ReadsInQuotaQueue; ui32 Subscriptions; i64 EndOffset; @@ -113,7 +97,7 @@ struct TUserInfo: public TUserInfoBase { } void UpdateReadingState() { - Counter.UpdateState(Subscriptions > 0 || ActiveReads > 0 || ReadRequests.size() > 0); //no data for read or got read requests from client + Counter.UpdateState(Subscriptions > 0 || ActiveReads > 0 || ReadsInQuotaQueue > 0); //no data for read or got read requests from client } void UpdateReadingTimeAndState(TInstant now) { @@ -155,7 +139,6 @@ struct TUserInfo: public TUserInfoBase { if (it != BytesReadFromDC.end()) it->second.Inc(readSize); } - ReadQuota.Exaust(readSize, now); for (auto& avg : AvgReadBytes) { avg.Update(readSize, now); } @@ -167,15 +150,14 @@ struct TUserInfo: public TUserInfoBase { TUserInfo( const TActorContext& ctx, - NMonitoring::TDynamicCounterPtr streamCountersSubgroup, THolder<TReadSpeedLimiterHolder> readSpeedLimiter, const TString& user, + NMonitoring::TDynamicCounterPtr streamCountersSubgroup, + const TString& user, const ui64 readRuleGeneration, const bool important, const NPersQueue::TTopicConverterPtr& topicConverter, const ui32 partition, const TString &session, ui32 gen, ui32 step, i64 offset, const ui64 readOffsetRewindSum, const TString& dcId, TInstant readFromTimestamp, - const TString& dbPath, bool meterRead, - ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000 + const TString& dbPath, bool meterRead ) : TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, important, readFromTimestamp} - , ReadSpeedLimiter(std::move(readSpeedLimiter)) , WriteTimestamp(TAppData::TimeProvider->Now()) , CreateTimestamp(TAppData::TimeProvider->Now()) , ReadTimestamp(TAppData::TimeProvider->Now()) @@ -187,9 +169,9 @@ struct TUserInfo: public TUserInfoBase { , ReadScheduled(false) , HasReadRule(false) , TopicConverter(topicConverter) - , ReadQuota(burst, speed, TAppData::TimeProvider->Now()) , Counter(nullptr) , ActiveReads(0) + , ReadsInQuotaQueue(0) , Subscriptions(0) , EndOffset(0) , AvgReadBytes{{TDuration::Seconds(1), 1000}, {TDuration::Minutes(1), 1000}, @@ -277,12 +259,6 @@ struct TUserInfo: public TUserInfoBase { } - void SetQuota(const ui64 maxBurst, const ui64 speed) { - ReadQuota.UpdateConfig(maxBurst, speed); - } - - void Clear(const TActorContext& ctx); - void UpdateReadOffset(const i64 offset, TInstant writeTimestamp, TInstant createTimestamp, TInstant now) { ReadOffset = offset; ReadWriteTimestamp = writeTimestamp; @@ -377,9 +353,16 @@ struct TUserInfo: public TUserInfoBase { class TUsersInfoStorage { public: - TUsersInfoStorage(TString dcId, ui64 tabletId, const NPersQueue::TTopicConverterPtr& topicConverter, ui32 partition, - const TTabletCountersBase& counters, const NKikimrPQ::TPQTabletConfig& config, - const TString& CloudId, const TString& DbId, const TString& DbPath, const bool isServerless, const TString& FolderId); + TUsersInfoStorage( + TString dcId, + const NPersQueue::TTopicConverterPtr& topicConverter, + ui32 partition, + const NKikimrPQ::TPQTabletConfig& config, + const TString& CloudId, + const TString& DbId, + const TString& DbPath, + const bool isServerless, + const TString& FolderId); void Init(TActorId tabletActor, TActorId partitionActor, const TActorContext& ctx); @@ -407,7 +390,6 @@ public: void Remove(const TString& user, const TActorContext& ctx); private: - THolder<TReadSpeedLimiterHolder> CreateReadSpeedLimiter(const TString& user) const; TUserInfo CreateUserInfo(const TActorContext& ctx, const TString& user, @@ -421,10 +403,8 @@ private: THashMap<TString, TUserInfo> UsersInfo; const TString DCId; - ui64 TabletId; NPersQueue::TTopicConverterPtr TopicConverter; const ui32 Partition; - TTabletCountersBase Counters; NMonitoring::TDynamicCounterPtr StreamCountersSubgroup; TMaybe<TActorId> TabletActor; diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index e3e2e27da00..c291086ed37 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -68,28 +68,28 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, tabletConfig->AddReadRules("user"); tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs); tabletConfig->SetMeteringMode(parameters.meteringMode); - auto config = tabletConfig->MutablePartitionConfig(); - if (parameters.speed > 0) { - config->SetWriteSpeedInBytesPerSecond(parameters.speed); - config->SetBurstSize(parameters.speed); + auto partitionConfig = tabletConfig->MutablePartitionConfig(); + if (parameters.writeSpeed > 0) { + partitionConfig->SetWriteSpeedInBytesPerSecond(parameters.writeSpeed); + partitionConfig->SetBurstSize(parameters.writeSpeed); } - config->SetMaxCountInPartition(parameters.maxCountInPartition); - config->SetMaxSizeInPartition(parameters.maxSizeInPartition); + partitionConfig->SetMaxCountInPartition(parameters.maxCountInPartition); + partitionConfig->SetMaxSizeInPartition(parameters.maxSizeInPartition); if (parameters.storageLimitBytes > 0) { - config->SetStorageLimitBytes(parameters.storageLimitBytes); + partitionConfig->SetStorageLimitBytes(parameters.storageLimitBytes); } else { - config->SetLifetimeSeconds(parameters.deleteTime); + partitionConfig->SetLifetimeSeconds(parameters.deleteTime); } - config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); + partitionConfig->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); if (parameters.sidMaxCount > 0) - config->SetSourceIdMaxCounts(parameters.sidMaxCount); - config->SetMaxWriteInflightSize(90'000'000); - config->SetLowWatermark(parameters.lowWatermark); + partitionConfig->SetSourceIdMaxCounts(parameters.sidMaxCount); + partitionConfig->SetMaxWriteInflightSize(90'000'000); + partitionConfig->SetLowWatermark(parameters.lowWatermark); for (auto& u : users) { if (u.second) - config->AddImportantClientId(u.first); + partitionConfig->AddImportantClientId(u.first); if (u.first != "user") tabletConfig->AddReadRules(u.first); } @@ -864,7 +864,7 @@ TVector<TString> CmdSourceIdRead(TTestContext& tc) { } -void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs) { +void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui32 size, const ui32 resCount, bool timeouted, TTestContext& tc, TVector<i32> offsets, const ui32 maxTimeLagMs, const ui64 readTimestampMs, const TString user) { TAutoPtr<IEventHandle> handle; TEvPersQueue::TEvResponse *result; THolder<TEvPersQueue::TEvRequest> request; @@ -877,7 +877,7 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui req->SetPartition(partition); auto read = req->MutableCmdRead(); read->SetOffset(offset); - read->SetClientId("user"); + read->SetClientId(user); read->SetCount(count); read->SetBytes(size); if (maxTimeLagMs > 0) { diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 003c89642a4..0f32885afcc 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -77,7 +77,6 @@ public: } }; - struct TTestContext { const TTabletTypes::EType PQTabletType = TTabletTypes::PersQueue; const TTabletTypes::EType BalancerTabletType = TTabletTypes::PersQueueReadBalancer; @@ -253,7 +252,7 @@ struct TTabletPreparationParameters { ui64 readFromTimestampsMs{0}; ui64 sidMaxCount{0}; ui32 specVersion{0}; - ui32 speed{0}; + ui32 writeSpeed{0}; i32 storageLimitBytes{0}; TString folderId{"somefolder"}; TString cloudId{"somecloud"}; @@ -458,7 +457,8 @@ void CmdRead( TTestContext& tc, TVector<i32> offsets = {}, const ui32 maxTimeLagMs = 0, - const ui64 readTimestampMs = 0); + const ui64 readTimestampMs = 0, + const TString user = "user"); void CmdReserveBytes( const ui32 partition, diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index ec85e940c66..dac1905f924 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -20,6 +20,70 @@ const static TString TOPIC_NAME = "rt3.dc1--topic"; Y_UNIT_TEST_SUITE(TPQTest) { +Y_UNIT_TEST(TestPartitionTotalQuota) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(1000); + + tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true); + tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetMaxParallelConsumersPerPartition(1); //total partition quota is equal to quota per consumer. Very low. + + PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"important_user", true}}, tc); + TVector<std::pair<ui64, TString>> data; + TString s{2_MB, 'c'}; + data.push_back({1, s}); + CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true); + + //check throttling on total partition quota + auto startTime = tc.Runtime->GetTimeProvider()->Now(); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1"); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user2"); + auto diff = (tc.Runtime->GetTimeProvider()->Now() - startTime).Seconds(); + UNIT_ASSERT(diff >= 9); //read quota is twice write quota. So, it's 200kb per seconds and 200kb burst. (2mb - 200kb) / 200kb = 9 seconds needed to get quota + }); +} + +Y_UNIT_TEST(TestPartitionPerConsumerQuota) { + TTestContext tc; + RunTestWithReboots(tc.TabletIds, [&]() { + return tc.InitialEventsFilter.Prepare(); + }, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) { + TFinalizer finalizer(tc); + tc.Prepare(dispatchName, setup, activeZone); + activeZone = false; + tc.Runtime->SetScheduledLimit(1000); + + tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true); + tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetMaxParallelConsumersPerPartition(1000); //total partition quota is 1 consumer quota * 1000. Very high. + + + PQTabletPrepare({.partitions = 1, .writeSpeed = 100_KB}, {{"important_user", true}}, tc); + TVector<std::pair<ui64, TString>> data; + TString s{2_MB, 'c'}; + data.push_back({1, s}); + CmdWrite(0, "sourceid0", data, tc, false, {}, false, "", -1, 0, false, false, true); + + //check throttling on per consumer quota + auto startTimeReadWithSameConsumer = tc.Runtime->GetTimeProvider()->Now(); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1"); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user1"); + auto diffReadWithSameConsumers = (tc.Runtime->GetTimeProvider()->Now() - startTimeReadWithSameConsumer).Seconds(); + UNIT_ASSERT(diffReadWithSameConsumers >= 9); //read quota is twice write quota. So, it's 200kb per seconds and 200kb burst. (2mb - 200kb) / 200kb = 9 seconds needed to get quota + + //check not throttling on total partition quota + auto startTimeReadWithDifferentConsumers = tc.Runtime->GetTimeProvider()->Now(); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user2"); + CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc, {0}, 0, 0, "user3"); + auto diffReadWithDifferentConsumers = (tc.Runtime->GetTimeProvider()->Now() - startTimeReadWithDifferentConsumers).Seconds(); + UNIT_ASSERT(diffReadWithDifferentConsumers <= 1); //different consumers. No throttling + }); +} + Y_UNIT_TEST(TestGroupsBalancer) { TTestContext tc; TFinalizer finalizer(tc); @@ -1056,7 +1120,7 @@ void TestWritePQImpl(bool fast) { tc.Runtime->SetScheduledLimit(100); // Important client, lifetimeseconds=0 - never delete - PQTabletPrepare({.partitions = 2, .speed = 200000000}, {{"user", true}}, tc); + PQTabletPrepare({.partitions = 2, .writeSpeed = 200000000}, {{"user", true}}, tc); TVector<std::pair<ui64, TString>> data, data1, data2; activeZone = PlainOrSoSlow(true, false) && fast; diff --git a/ydb/core/persqueue/ut/resources/counters_labeled.json b/ydb/core/persqueue/ut/resources/counters_labeled.json index 85804aa58b8..28fd813977d 100644 --- a/ydb/core/persqueue/ut/resources/counters_labeled.json +++ b/ydb/core/persqueue/ut/resources/counters_labeled.json @@ -1817,24 +1817,6 @@ "labels": { "user_counters": "PersQueue", "topic": "rt3.dc1--asdfgs--topic", - "sensor": "PQ/ReserveLimitBytes" - }, - "value": 0 - }, - { - "kind": "GAUGE", - "labels": { - "user_counters": "PersQueue", - "topic": "rt3.dc1--asdfgs--topic", - "sensor": "PQ/ReserveUsedBytes" - }, - "value": 0 - }, - { - "kind": "GAUGE", - "labels": { - "user_counters": "PersQueue", - "topic": "rt3.dc1--asdfgs--topic", "sensor": "PQ/SourceIdCount" }, "value": 3 @@ -2141,24 +2123,6 @@ "labels": { "user_counters": "PersQueue", "topic": "total", - "sensor": "PQ/ReserveLimitBytes" - }, - "value": 0 - }, - { - "kind": "GAUGE", - "labels": { - "user_counters": "PersQueue", - "topic": "total", - "sensor": "PQ/ReserveUsedBytes" - }, - "value": 0 - }, - { - "kind": "GAUGE", - "labels": { - "user_counters": "PersQueue", - "topic": "total", "sensor": "PQ/SourceIdCount" }, "value": 3 diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html index 1bf45fdec3f..adbf9f9247b 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics.html +++ b/ydb/core/persqueue/ut/resources/counters_topics.html @@ -13,6 +13,8 @@ host=: name=topic.partition.alive_count: 1 name=topic.partition.init_duration_milliseconds_max: 0 name=topic.partition.producers_count_max: 3 + name=topic.partition.read.speed_limit_bytes_per_second: 20000000000 + name=topic.partition.read.throttled_microseconds_max: 0 name=topic.partition.storage_bytes_max: 747 name=topic.partition.total_count: 2 name=topic.partition.uptime_milliseconds_min: 30000 @@ -22,7 +24,7 @@ host=: name=topic.partition.write.idle_milliseconds_max: 0 name=topic.partition.write.lag_milliseconds_max: 600 name=topic.partition.write.speed_limit_bytes_per_second: 50000000 - name=topic.partition.write.throttled_nanoseconds_max: 0 + name=topic.partition.write.throttled_microseconds_max: 0 name=topic.producers_count: 3 name=topic.reserve.limit_bytes: 0 name=topic.reserve.used_bytes: 0 @@ -37,6 +39,8 @@ host=: name=topic.partition.read.idle_milliseconds_max: 30000 name=topic.partition.read.lag_messages_max: 29 name=topic.partition.read.lag_milliseconds_max: 0 + name=topic.partition.read.speed_limit_bytes_per_second: 1000000000 + name=topic.partition.read.throttled_microseconds_max: 0 name=topic.partition.write.lag_milliseconds_max: 200 name=topic.read.lag_messages: 29 </pre> diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 520b06b0789..b14b9184059 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -22,7 +22,8 @@ SRCS( pq_l2_cache.cpp quota_tracker.cpp read_balancer.cpp - read_speed_limiter.cpp + account_read_quoter.cpp + read_quoter.cpp sourceid.cpp subscriber.cpp transaction.cpp diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 466de907f89..f242539fcf7 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -172,13 +172,13 @@ enum EClientLabeledCounters { METRIC_TOTAL_READ_SPEED_4 = 20 [(LabeledCounterOpts) = {Name: "ReadBytesPerDay" AggrFunc : EAF_SUM SVName: ""}]; METRIC_MAX_READ_SPEED_4 = 21 [(LabeledCounterOpts) = {Name: "ReadBytesMaxPerDay" AggrFunc : EAF_MAX SVName: ""}]; - METRIC_READ_QUOTA_BYTES = 22 [(LabeledCounterOpts) = {Name: "ReadBytesQuota" AggrFunc : EAF_MIN SVName: ""}]; + METRIC_READ_QUOTA_PER_CONSUMER_BYTES = 22 [(LabeledCounterOpts) = {Name: "ReadBytesQuota" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}]; METRIC_READ_TIME_LAG = 23 [(LabeledCounterOpts) = {Name: "ReadTimeLagMs" AggrFunc : EAF_MAX SVName: "topic.partition.read.lag_milliseconds_max"}]; METRIC_WRITE_TIME_LAG = 24 [(LabeledCounterOpts) = {Name: "WriteTimeLagMsByLastRead" AggrFunc : EAF_MAX SVName: "topic.partition.write.lag_milliseconds_max"}]; METRIC_LAST_READ_TIME = 25 [(LabeledCounterOpts) = {Name: "TimeSinceLastReadMs" AggrFunc : EAF_MIN Type : CT_TIMELAG SVName: "topic.partition.read.idle_milliseconds_max"}]; - METRIC_READ_QUOTA_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX SVName: ""}]; + METRIC_READ_QUOTA_PER_CONSUMER_USAGE = 26 [(LabeledCounterOpts) = {Name: "PartitionMaxReadQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.read.throttled_microseconds_max"}]; } @@ -224,11 +224,13 @@ enum EPartitionLabeledCounters { METRIC_MAX_QUOTA_SPEED_3 = 29 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerHour" AggrFunc : EAF_MAX SVName: "topic.partition.write.bytes_per_hour_max"}]; METRIC_TOTAL_QUOTA_SPEED_4 = 30 [(LabeledCounterOpts) = {Name: "QuotaBytesPerDay" AggrFunc : EAF_SUM SVName: ""}]; METRIC_MAX_QUOTA_SPEED_4 = 31 [(LabeledCounterOpts) = {Name: "QuotaBytesMaxPerDay" AggrFunc : EAF_MAX SVName: "topic.partition.write.bytes_per_day_max"}]; - METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.write.throttled_nanoseconds_max"}]; + METRIC_WRITE_QUOTA_USAGE = 32 [(LabeledCounterOpts) = {Name: "PartitionMaxWriteQuotaUsage" AggrFunc : EAF_MAX SVName: "topic.partition.write.throttled_microseconds_max"}]; METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN SVName: ""}]; METRIC_PARTITIONS_TOTAL = 34 [(LabeledCounterOpts) = {Name: "PartitionsTotal" AggrFunc : EAF_MAX SVName: "topic.partition.total_count"}]; - METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "ReserveLimitBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}]; - METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "ReserveUsedBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}]; + METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}]; + METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}]; + METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE = 37 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MAX SVName: "topic.partition.read.throttled_microseconds_max"}]; + METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES = 38 [(LabeledCounterOpts) = {Name: "" AggrFunc : EAF_MIN SVName: "topic.partition.read.speed_limit_bytes_per_second"}]; } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 1e3b26f44e4..e92d51cf9bb 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -80,6 +80,8 @@ message TPQConfig { optional bool PartitionReadQuotaIsTwiceWriteQuota = 7 [default = false]; + optional uint32 MaxParallelConsumersPerPartition = 8 [default = 20]; + } optional TQuotingConfig QuotingConfig = 18; diff --git a/ydb/core/tablet/private/aggregated_counters.cpp b/ydb/core/tablet/private/aggregated_counters.cpp index f4d0424a261..81acc72c393 100644 --- a/ydb/core/tablet/private/aggregated_counters.cpp +++ b/ydb/core/tablet/private/aggregated_counters.cpp @@ -545,6 +545,8 @@ void TAggregatedLabeledCounters::FillGetRequestV1( labeledCounters->SetGroup(group); labeledCounters->SetDelimiter("/"); //TODO: change here to "|" for (ui32 i = start; i < end; ++i) { + if (strlen(Names[i]) == 0 || strcmp(Names[i], "PQ/") == 0) + continue; auto& labeledCounter = *labeledCounters->AddLabeledCounter(); labeledCounter.SetValue(GetValue(i)); labeledCounter.SetId(GetId(i)); @@ -570,7 +572,7 @@ void TAggregatedLabeledCounters::FillGetRequestV2( labeledCounter.SetValue(GetValue(i)); labeledCounter.SetNameId(context->GetNameId(Names[i])); labeledCounter.SetAggregateFunc(NKikimr::TLabeledCounterOptions::EAggregateFunc(AggrFunc[i])); - labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i])); + labeledCounter.SetType(NKikimr::TLabeledCounterOptions::ECounterType(Types[i])); } } diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index 4cbf9e46325..3f4549c7cda 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -157,6 +157,8 @@ public: } for (ui32 i = 0, e = labeledCounters->GetCounters().Size(); i < e; ++i) { + if(!strlen(labeledCounters->GetCounterName(i))) + continue; const ui64& value = labeledCounters->GetCounters()[i].Get(); const ui64& id = labeledCounters->GetIds()[i].Get(); iterTabletType->second->SetValue(tabletId, i, value, id); diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index db1b977b166..f9b74e17657 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -836,7 +836,7 @@ message TActivity { PERSQUEUE_MIRRORER = 451; LONG_TX_SERVICE = 452; LONG_TX_SERVICE_COMMIT = 453; - PERSQUEUE_READ_SPEED_LIMITER = 454; + PERSQUEUE_ACCOUNT_READ_QUOTER = 454; LONG_TX_SERVICE_ACQUIRE_SNAPSHOT = 455; BLOCKSTORE_PARTITION_NONREPL = 456; BS_PROXY_PATCH_ACTOR = 457; @@ -995,5 +995,6 @@ message TActivity { CMS_API_ADAPTER = 613; KQP_SCAN_FETCH_ACTOR = 614; COLUMNSHARD_CONVEYOR = 615; + PERSQUEUE_READ_QUOTER = 616; }; }; diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 6643bfe72df..0ded0094f30 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -813,6 +813,9 @@ message DescribeTopicResult { // Zero value means default limit: 1 MB per second. int64 partition_write_speed_bytes_per_second = 8; + int64 partition_total_read_speed_bytes_per_second = 14; + int64 partition_consumer_read_speed_bytes_per_second = 15; + // Burst size for write in partition, in bytes. // Zero value means default limit: 1 MB. int64 partition_write_burst_bytes = 9; diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 9058c9d1af6..8830ab95086 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -936,6 +936,12 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv Result.set_partition_write_burst_bytes(partConfig.GetBurstSize()); } + if (pqConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { + auto readSpeedPerConsumer = partConfig.GetWriteSpeedInBytesPerSecond() * 2; + Result.set_partition_total_read_speed_bytes_per_second(readSpeedPerConsumer * pqConfig.GetQuotingConfig().GetMaxParallelConsumersPerPartition()); + Result.set_partition_consumer_read_speed_bytes_per_second(readSpeedPerConsumer); + } + for (const auto &codec : config.GetCodecs().GetIds()) { Result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1)); } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 7b3ec66d9e5..79884fc97e4 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1992,32 +1992,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->DeleteTopic2(DEFAULT_TOPIC_NAME); } - - Y_UNIT_TEST(BigRead) { - NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(24_MB)); - server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400, 20000000, "user", 2000000); - - server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); - - TString value(1_MB, 'x'); - for (ui32 i = 0; i < 32; ++i) - server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); - - // trying to read small PQ messages in a big gRPC event - auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb - UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0); - UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4); - - TInstant now(TInstant::Now()); - info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb - TDuration dur = TInstant::Now() - now; - UNIT_ASSERT_C(dur > TDuration::Seconds(7) && dur < TDuration::Seconds(20), "dur = " << dur); //speed limit is 2000kb/s and burst is 2000kb, so to read 24mb it will take at least 11 seconds - - server.AnnoyingClient->GetPartStatus({}, 1, true); - - } - - // expects that L2 size is 32Mb Y_UNIT_TEST(Cache) { NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(18_MB)); |