diff options
author | savnik <savnik@yandex-team.com> | 2023-08-31 09:48:34 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-08-31 10:11:40 +0300 |
commit | eda974389da3036fc702fce50ab75ad71ad13bfb (patch) | |
tree | 749f5cb43687cd74d9650dfdc3ba5af5624764c4 | |
parent | 697f4769f1195d8d2e8b69ba560556f8404dec55 (diff) | |
download | ydb-eda974389da3036fc702fce50ab75ad71ad13bfb.tar.gz |
Fix read without consumer
-rw-r--r-- | ydb/core/persqueue/partition_read.cpp | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/read_quoter.cpp | 15 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp | 8 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 9 |
4 files changed, 21 insertions, 14 deletions
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 6955ac96b26..1bc39fc3858 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -63,6 +63,9 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config } for (auto& consumer : hasReadRule) { auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx); + if (userInfo.NoConsumer) { + continue; + } THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); if (!userInfo.Important && userInfo.LabeledCounters) { diff --git a/ydb/core/persqueue/read_quoter.cpp b/ydb/core/persqueue/read_quoter.cpp index 91aea62fed7..68e1670deb6 100644 --- a/ydb/core/persqueue/read_quoter.cpp +++ b/ydb/core/persqueue/read_quoter.cpp @@ -74,12 +74,13 @@ void TReadQuoter::HandleWakeUp(TEvents::TEvWakeup::TPtr&, const TActorContext& c } void TReadQuoter::ProcessInflightQueue(const TActorContext& ctx) { + auto now = ctx.Now(); while (!WaitingInflightReadRequests.empty() && RequestsInflight < AppData(ctx)->PQConfig.GetMaxInflightReadRequestsPerPartition()) { auto readEvent(std::move(WaitingInflightReadRequests.front())); WaitingInflightReadRequests.pop_front(); StartQuoting(readEvent, ctx); if (WaitingInflightReadRequests.size() == 0) { - InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now()); + InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now); UpdateCounters(ctx); } } @@ -154,10 +155,12 @@ void TReadQuoter::HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEv } void TReadQuoter::UpdateCounters(const TActorContext& ctx) { + auto now = ctx.Now(); if (!WaitingInflightReadRequests.empty()) { - InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now()); + InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now); + InflightIsFullStartTime = now; } else { - InflightLimitSlidingWindow.Update(ctx.Now()); + InflightLimitSlidingWindow.Update(now); } Send(PartitionActor, new NReadQuoterEvents::TEvQuotaCountersUpdated(InflightLimitSlidingWindow.GetValue() / 60)); } @@ -175,10 +178,8 @@ void TReadQuoter::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorCo void TReadQuoter::UpdateQuota(const TActorContext &ctx) { TVector<std::pair<TString, ui64>> updatedQuotas; for (auto& [consumerStr, consumerQuota] : ConsumerQuotas) { - if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged( - GetConsumerReadBurst(ctx), - GetConsumerReadSpeed(ctx))) { - updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}); + if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(GetConsumerReadBurst(ctx), GetConsumerReadSpeed(ctx))) { + updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}); } } auto totalQuotaUpdated = PartitionTotalQuotaTracker.UpdateConfigIfChanged(GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx)); diff --git a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp index f9c3f5261a2..55b99f3598c 100644 --- a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp +++ b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp @@ -8,11 +8,7 @@ Y_UNIT_TEST_SUITE(TMicrosecondsSlidingWindow) { Y_UNIT_TEST(Basic) { TMicrosecondsSlidingWindow sw(60, TDuration::Minutes(1)); - TInstant now = TInstant::Now(); - sw.Update(now); - now += TDuration::Seconds(60); - sw.Update(now); sw.Update(TDuration::Seconds(5).MicroSeconds(), now); now += TDuration::Seconds(58); @@ -47,6 +43,10 @@ Y_UNIT_TEST(Basic) { now += TDuration::Seconds(20); sw.Update(now); UNIT_ASSERT_EQUAL(sw.GetValue(), 40'000'000); + + now += TDuration::Seconds(180); + sw.Update(TDuration::Seconds(180).MicroSeconds(), now); + UNIT_ASSERT_EQUAL(sw.GetValue(), 60'000'000); } } //Y_UNIT_TEST_SUITE diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 9f99af51a86..d3d99588355 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -237,7 +237,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF case TClientMessage::kCommitOffsetRequest: { const auto& req = request.commit_offset_request(); - if(ReadWithoutConsumer) { + if (ReadWithoutConsumer) { return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit when reading without a consumer", ctx); } if (!RangesMode || !req.commit_offsets_size()) { @@ -967,7 +967,10 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c } for (auto& [_, holder] : Topics) { - holder.PipeClient = CreatePipeClient(holder.TabletID, ctx); + if (!ReadWithoutConsumer) { + holder.PipeClient = CreatePipeClient(holder.TabletID, ctx); + } + Y_VERIFY(holder.FullConverter); auto it = TopicGroups.find(holder.FullConverter->GetInternalName()); if (it != TopicGroups.end()) { @@ -980,7 +983,7 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c for (const auto& [topicName, topic] : Topics) { if (ReadWithoutConsumer) { if (topic.Groups.size() == 0) { - return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx); //savnik: groups for migration protocol? + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx); } for (auto group : topic.Groups) { SendLockPartitionToSelf(group-1, topicName, topic, ctx); |