diff options
| author | savnik <[email protected]> | 2023-08-31 09:48:34 +0300 | 
|---|---|---|
| committer | savnik <[email protected]> | 2023-08-31 10:11:40 +0300 | 
| commit | eda974389da3036fc702fce50ab75ad71ad13bfb (patch) | |
| tree | 749f5cb43687cd74d9650dfdc3ba5af5624764c4 | |
| parent | 697f4769f1195d8d2e8b69ba560556f8404dec55 (diff) | |
Fix read without consumer
| -rw-r--r-- | ydb/core/persqueue/partition_read.cpp | 3 | ||||
| -rw-r--r-- | ydb/core/persqueue/read_quoter.cpp | 15 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp | 8 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 9 | 
4 files changed, 21 insertions, 14 deletions
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 6955ac96b26..1bc39fc3858 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -63,6 +63,9 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config      }      for (auto& consumer : hasReadRule) {          auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx); +        if (userInfo.NoConsumer) { +            continue; +        }          THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,                                                                                 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);          if (!userInfo.Important && userInfo.LabeledCounters) { diff --git a/ydb/core/persqueue/read_quoter.cpp b/ydb/core/persqueue/read_quoter.cpp index 91aea62fed7..68e1670deb6 100644 --- a/ydb/core/persqueue/read_quoter.cpp +++ b/ydb/core/persqueue/read_quoter.cpp @@ -74,12 +74,13 @@ void TReadQuoter::HandleWakeUp(TEvents::TEvWakeup::TPtr&, const TActorContext& c  }  void TReadQuoter::ProcessInflightQueue(const TActorContext& ctx) { +    auto now = ctx.Now();      while (!WaitingInflightReadRequests.empty() && RequestsInflight < AppData(ctx)->PQConfig.GetMaxInflightReadRequestsPerPartition()) {          auto readEvent(std::move(WaitingInflightReadRequests.front()));          WaitingInflightReadRequests.pop_front();          StartQuoting(readEvent, ctx);          if (WaitingInflightReadRequests.size() == 0) { -            InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now()); +            InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now);              UpdateCounters(ctx);          }      } @@ -154,10 +155,12 @@ void TReadQuoter::HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEv  }  void TReadQuoter::UpdateCounters(const TActorContext& ctx) { +    auto now = ctx.Now();      if (!WaitingInflightReadRequests.empty()) { -        InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now()); +        InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now); +        InflightIsFullStartTime = now;      } else { -        InflightLimitSlidingWindow.Update(ctx.Now()); +        InflightLimitSlidingWindow.Update(now);      }      Send(PartitionActor, new NReadQuoterEvents::TEvQuotaCountersUpdated(InflightLimitSlidingWindow.GetValue() / 60));  } @@ -175,10 +178,8 @@ void TReadQuoter::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorCo  void TReadQuoter::UpdateQuota(const TActorContext &ctx) {      TVector<std::pair<TString, ui64>> updatedQuotas;      for (auto& [consumerStr, consumerQuota] : ConsumerQuotas) { -        if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged( -        GetConsumerReadBurst(ctx), -        GetConsumerReadSpeed(ctx))) { -             updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()}); +        if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(GetConsumerReadBurst(ctx), GetConsumerReadSpeed(ctx))) { +            updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()});          }      }      auto totalQuotaUpdated = PartitionTotalQuotaTracker.UpdateConfigIfChanged(GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx)); diff --git a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp index f9c3f5261a2..55b99f3598c 100644 --- a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp +++ b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp @@ -8,11 +8,7 @@ Y_UNIT_TEST_SUITE(TMicrosecondsSlidingWindow) {  Y_UNIT_TEST(Basic) {      TMicrosecondsSlidingWindow sw(60, TDuration::Minutes(1)); -      TInstant now = TInstant::Now(); -    sw.Update(now); -    now += TDuration::Seconds(60); -    sw.Update(now);      sw.Update(TDuration::Seconds(5).MicroSeconds(), now);      now += TDuration::Seconds(58); @@ -47,6 +43,10 @@ Y_UNIT_TEST(Basic) {      now += TDuration::Seconds(20);      sw.Update(now);      UNIT_ASSERT_EQUAL(sw.GetValue(), 40'000'000); + +    now += TDuration::Seconds(180); +    sw.Update(TDuration::Seconds(180).MicroSeconds(), now); +    UNIT_ASSERT_EQUAL(sw.GetValue(), 60'000'000);  }  } //Y_UNIT_TEST_SUITE diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 9f99af51a86..d3d99588355 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -237,7 +237,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF              case TClientMessage::kCommitOffsetRequest: {                  const auto& req = request.commit_offset_request(); -                if(ReadWithoutConsumer) { +                if (ReadWithoutConsumer) {                      return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit when reading without a consumer", ctx);                  }                  if (!RangesMode || !req.commit_offsets_size()) { @@ -967,7 +967,10 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c      }      for (auto& [_, holder] : Topics) { -        holder.PipeClient = CreatePipeClient(holder.TabletID, ctx); +        if (!ReadWithoutConsumer) { +            holder.PipeClient = CreatePipeClient(holder.TabletID, ctx); +        } +                  Y_VERIFY(holder.FullConverter);          auto it = TopicGroups.find(holder.FullConverter->GetInternalName());          if (it != TopicGroups.end()) { @@ -980,7 +983,7 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c      for (const auto& [topicName, topic] : Topics) {          if (ReadWithoutConsumer) {              if (topic.Groups.size() == 0) { -                return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx); //savnik: groups for migration protocol? +                return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx);              }              for (auto group : topic.Groups) {                  SendLockPartitionToSelf(group-1, topicName, topic, ctx);  | 
