aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-08-31 09:48:34 +0300
committersavnik <savnik@yandex-team.com>2023-08-31 10:11:40 +0300
commiteda974389da3036fc702fce50ab75ad71ad13bfb (patch)
tree749f5cb43687cd74d9650dfdc3ba5af5624764c4
parent697f4769f1195d8d2e8b69ba560556f8404dec55 (diff)
downloadydb-eda974389da3036fc702fce50ab75ad71ad13bfb.tar.gz
Fix read without consumer
-rw-r--r--ydb/core/persqueue/partition_read.cpp3
-rw-r--r--ydb/core/persqueue/read_quoter.cpp15
-rw-r--r--ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp8
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp9
4 files changed, 21 insertions, 14 deletions
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 6955ac96b26..1bc39fc3858 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -63,6 +63,9 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
}
for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage->GetOrCreate(consumer, ctx);
+ if (userInfo.NoConsumer) {
+ continue;
+ }
THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,
0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
if (!userInfo.Important && userInfo.LabeledCounters) {
diff --git a/ydb/core/persqueue/read_quoter.cpp b/ydb/core/persqueue/read_quoter.cpp
index 91aea62fed7..68e1670deb6 100644
--- a/ydb/core/persqueue/read_quoter.cpp
+++ b/ydb/core/persqueue/read_quoter.cpp
@@ -74,12 +74,13 @@ void TReadQuoter::HandleWakeUp(TEvents::TEvWakeup::TPtr&, const TActorContext& c
}
void TReadQuoter::ProcessInflightQueue(const TActorContext& ctx) {
+ auto now = ctx.Now();
while (!WaitingInflightReadRequests.empty() && RequestsInflight < AppData(ctx)->PQConfig.GetMaxInflightReadRequestsPerPartition()) {
auto readEvent(std::move(WaitingInflightReadRequests.front()));
WaitingInflightReadRequests.pop_front();
StartQuoting(readEvent, ctx);
if (WaitingInflightReadRequests.size() == 0) {
- InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now());
+ InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now);
UpdateCounters(ctx);
}
}
@@ -154,10 +155,12 @@ void TReadQuoter::HandleUpdateAccountQuotaCounters(NAccountReadQuoterEvents::TEv
}
void TReadQuoter::UpdateCounters(const TActorContext& ctx) {
+ auto now = ctx.Now();
if (!WaitingInflightReadRequests.empty()) {
- InflightLimitSlidingWindow.Update((ctx.Now() - InflightIsFullStartTime).MicroSeconds(), ctx.Now());
+ InflightLimitSlidingWindow.Update((now - InflightIsFullStartTime).MicroSeconds(), now);
+ InflightIsFullStartTime = now;
} else {
- InflightLimitSlidingWindow.Update(ctx.Now());
+ InflightLimitSlidingWindow.Update(now);
}
Send(PartitionActor, new NReadQuoterEvents::TEvQuotaCountersUpdated(InflightLimitSlidingWindow.GetValue() / 60));
}
@@ -175,10 +178,8 @@ void TReadQuoter::HandlePoisonPill(TEvents::TEvPoisonPill::TPtr&, const TActorCo
void TReadQuoter::UpdateQuota(const TActorContext &ctx) {
TVector<std::pair<TString, ui64>> updatedQuotas;
for (auto& [consumerStr, consumerQuota] : ConsumerQuotas) {
- if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(
- GetConsumerReadBurst(ctx),
- GetConsumerReadSpeed(ctx))) {
- updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()});
+ if (consumerQuota.PartitionPerConsumerQuotaTracker.UpdateConfigIfChanged(GetConsumerReadBurst(ctx), GetConsumerReadSpeed(ctx))) {
+ updatedQuotas.push_back({consumerStr, consumerQuota.PartitionPerConsumerQuotaTracker.GetTotalSpeed()});
}
}
auto totalQuotaUpdated = PartitionTotalQuotaTracker.UpdateConfigIfChanged(GetTotalPartitionReadBurst(ctx), GetTotalPartitionReadSpeed(ctx));
diff --git a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp
index f9c3f5261a2..55b99f3598c 100644
--- a/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp
+++ b/ydb/core/persqueue/ut/microseconds_sliding_window_ut.cpp
@@ -8,11 +8,7 @@ Y_UNIT_TEST_SUITE(TMicrosecondsSlidingWindow) {
Y_UNIT_TEST(Basic) {
TMicrosecondsSlidingWindow sw(60, TDuration::Minutes(1));
-
TInstant now = TInstant::Now();
- sw.Update(now);
- now += TDuration::Seconds(60);
- sw.Update(now);
sw.Update(TDuration::Seconds(5).MicroSeconds(), now);
now += TDuration::Seconds(58);
@@ -47,6 +43,10 @@ Y_UNIT_TEST(Basic) {
now += TDuration::Seconds(20);
sw.Update(now);
UNIT_ASSERT_EQUAL(sw.GetValue(), 40'000'000);
+
+ now += TDuration::Seconds(180);
+ sw.Update(TDuration::Seconds(180).MicroSeconds(), now);
+ UNIT_ASSERT_EQUAL(sw.GetValue(), 60'000'000);
}
} //Y_UNIT_TEST_SUITE
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 9f99af51a86..d3d99588355 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -237,7 +237,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
case TClientMessage::kCommitOffsetRequest: {
const auto& req = request.commit_offset_request();
- if(ReadWithoutConsumer) {
+ if (ReadWithoutConsumer) {
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit when reading without a consumer", ctx);
}
if (!RangesMode || !req.commit_offsets_size()) {
@@ -967,7 +967,10 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c
}
for (auto& [_, holder] : Topics) {
- holder.PipeClient = CreatePipeClient(holder.TabletID, ctx);
+ if (!ReadWithoutConsumer) {
+ holder.PipeClient = CreatePipeClient(holder.TabletID, ctx);
+ }
+
Y_VERIFY(holder.FullConverter);
auto it = TopicGroups.find(holder.FullConverter->GetInternalName());
if (it != TopicGroups.end()) {
@@ -980,7 +983,7 @@ void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& c
for (const auto& [topicName, topic] : Topics) {
if (ReadWithoutConsumer) {
if (topic.Groups.size() == 0) {
- return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx); //savnik: groups for migration protocol?
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "explicitly specify the partitions when reading without a consumer", ctx);
}
for (auto group : topic.Groups) {
SendLockPartitionToSelf(group-1, topicName, topic, ctx);