diff options
author | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-02-17 12:35:30 +0300 |
---|---|---|
committer | Alexey Bogolyubskiy <i@bogolyubskiyalexey.ru> | 2022-02-17 12:35:30 +0300 |
commit | b19ffd720e0480be0e9474c947b5c9d9d5cbf3fa (patch) | |
tree | f0a55fbc165d5b8292e28f4bf32a70f38a3e2f29 | |
parent | daf18fc9b115222e95f2e08b4fec7130c2eba979 (diff) | |
download | ydb-b19ffd720e0480be0e9474c947b5c9d9d5cbf3fa.tar.gz |
fix partition status request in pqv1 and test for it LOGBROKER-7308
ref:b5588701a17843c719c964bbbcc3e223884fce19
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 34 |
2 files changed, 34 insertions, 10 deletions
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index f0b031051c6..82b7a0658aa 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -579,22 +579,14 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorCo } void TReadSessionActor::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) { - auto it = Partitions.find(ev->Get()->Partition.AssignId); - if (it == Partitions.end()) { - // Ignore request - client asking status after releasing of partition. - return; - } - if (!it->second.Releasing) { + if (it == Partitions.end() || it->second.Releasing) { // Ignore request - client asking status after releasing of partition. return; - } ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->Partition)); } - - void TReadSessionActor::DropPartition(THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx) { ctx.Send(it->second.Actor, new TEvents::TEvPoisonPill()); bool res = ActualPartitionActors.erase(it->second.Actor); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 6c4cbdf1f09..30d9ab9b8ec 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3589,10 +3589,42 @@ namespace { ui64 time = (TInstant::Now() + TDuration::Hours(4)).MilliSeconds(); runTest(legacyName, legacyName, topicName, srcId2, 7, time); + } + Y_UNIT_TEST(TestReadPartitionStatus) { + NPersQueue::TTestServer server; + TString topic = "topic1"; + TString topicFullName = "rt3.dc1--" + topic; - } + server.AnnoyingClient->CreateTopic(topicFullName, 1); + server.EnableLogs({ NKikimrServices::PQ_READ_PROXY}); + + auto driver = server.AnnoyingClient->GetDriver(); + + auto reader = CreateReader( + *driver, + NYdb::NPersQueue::TReadSessionSettings() + .AppendTopics(topic) + .ConsumerName("shared/user") + .ReadOnlyOriginal(true) + ); + { + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); + auto createStream = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*event); + UNIT_ASSERT(createStream); + Cerr << "Create stream event: " << createStream->DebugString() << Endl; + createStream->GetPartitionStream()->RequestStatus(); + } + { + auto future = reader->WaitEvent(); + UNIT_ASSERT(future.Wait(TDuration::Seconds(10))); + TMaybe<NYdb::NPersQueue::TReadSessionEvent::TEvent> event = reader->GetEvent(true, 1); + auto partitionStatus = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamStatusEvent>(&*event); + UNIT_ASSERT(partitionStatus); + Cerr << "partition status: " << partitionStatus->DebugString() << Endl; + } + } } } |