aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-02-17 12:35:30 +0300
committerAlexey Bogolyubskiy <i@bogolyubskiyalexey.ru>2022-02-17 12:35:30 +0300
commitb19ffd720e0480be0e9474c947b5c9d9d5cbf3fa (patch)
treef0a55fbc165d5b8292e28f4bf32a70f38a3e2f29
parentdaf18fc9b115222e95f2e08b4fec7130c2eba979 (diff)
downloadydb-b19ffd720e0480be0e9474c947b5c9d9d5cbf3fa.tar.gz
fix partition status request in pqv1 and test for it LOGBROKER-7308
ref:b5588701a17843c719c964bbbcc3e223884fce19
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp10
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp34
2 files changed, 34 insertions, 10 deletions
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index f0b031051c6..82b7a0658aa 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -579,22 +579,14 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorCo
}
void TReadSessionActor::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) {
-
auto it = Partitions.find(ev->Get()->Partition.AssignId);
- if (it == Partitions.end()) {
- // Ignore request - client asking status after releasing of partition.
- return;
- }
- if (!it->second.Releasing) {
+ if (it == Partitions.end() || it->second.Releasing) {
// Ignore request - client asking status after releasing of partition.
return;
-
}
ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->Partition));
}
-
-
void TReadSessionActor::DropPartition(THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx) {
ctx.Send(it->second.Actor, new TEvents::TEvPoisonPill());
bool res = ActualPartitionActors.erase(it->second.Actor);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 6c4cbdf1f09..30d9ab9b8ec 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3589,10 +3589,42 @@ namespace {
ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds();
runTest(legacyName, legacyName, topicName, srcId2, 7, time);
+ }
+ Y_UNIT_TEST(TestReadPartitionStatus) {
+ NPersQueue::TTestServer server;
+ TString topic = "topic1";
+ TString topicFullName = "rt3.dc1--" + topic;
- }
+ server.AnnoyingClient->CreateTopic(topicFullName, 1);
+ server.EnableLogs({ NKikimrServices::PQ_READ_PROXY});
+
+ auto driver = server.AnnoyingClient->GetDriver();
+
+ auto reader = CreateReader(
+ *driver,
+ NYdb::NPersQueue::TReadSessionSettings()
+ .AppendTopics(topic)
+ .ConsumerName("shared/user")
+ .ReadOnlyOriginal(true)
+ );
+ {
+ TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
+ auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event);
+ UNIT_ASSERT(createStream);
+ Cerr << "Create stream event: " << createStream->DebugString() << Endl;
+ createStream->GetPartitionStream()->RequestStatus();
+ }
+ {
+ auto future = reader->WaitEvent();
+ UNIT_ASSERT(future.Wait(TDuration::Seconds(10)));
+ TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1);
+ auto partitionStatus = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent>(&*event);
+ UNIT_ASSERT(partitionStatus);
+ Cerr << "partition status: " << partitionStatus->DebugString() << Endl;
+ }
+ }
}
}