diff options
author | ildar-khisam <[email protected]> | 2022-08-03 20:06:34 +0300 |
---|---|---|
committer | ildar-khisam <[email protected]> | 2022-08-03 20:06:34 +0300 |
commit | 58a4673f45222b6c60584270ad2fada532dabb88 (patch) | |
tree | 3ed59a6a51cad52db6f771de7d7c91f28aae0c44 | |
parent | 6e0b86fc8b39114155339a1677dacb695b405baa (diff) |
topic service partition id hotfix
-rw-r--r-- | ydb/services/persqueue_v1/actors/events.h | 18 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 75 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 22 |
3 files changed, 50 insertions, 65 deletions
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 1837044a763..b986af3dfb5 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -295,15 +295,15 @@ struct TEvPQProxy { }; struct TEvStartRead : public NActors::TEventLocal<TEvStartRead, EvStartRead> { - TEvStartRead(const TPartitionId& partition, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset) - : Partition(partition) + TEvStartRead(ui64 id, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset) + : AssignId(id) , ReadOffset(readOffset) , CommitOffset(commitOffset) , VerifyReadOffset(verifyReadOffset) , Generation(0) { } - const TPartitionId Partition; + const ui64 AssignId; ui64 ReadOffset; ui64 CommitOffset; bool VerifyReadOffset; @@ -311,19 +311,19 @@ struct TEvPQProxy { }; struct TEvReleased : public NActors::TEventLocal<TEvReleased, EvReleased> { - TEvReleased(const TPartitionId& partition) - : Partition(partition) + TEvReleased(ui64 id) + : AssignId(id) { } - const TPartitionId Partition; + const ui64 AssignId; }; struct TEvGetStatus : public NActors::TEventLocal<TEvGetStatus, EvGetStatus> { - TEvGetStatus(const TPartitionId& partition) - : Partition(partition) + TEvGetStatus(ui64 id) + : AssignId(id) { } - const TPartitionId Partition; + const ui64 AssignId; }; diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 3229faa2e6d..c1a3164b2d5 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -120,39 +120,15 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone()); return; } - auto converterFactory = TopicsHandler.GetConverterFactory(); - auto MakePartitionId = [&](auto& request) { - const ui64 id = [&request](){ - if constexpr (UseMigrationProtocol) { - return request.assign_id(); - } else { - return request.partition_session_id(); - } - }(); - Y_VERIFY(Partitions.find(id) != Partitions.end()); - - const auto& info = Partitions.at(id); - auto topic = info.Topic->GetFederationPath(); - const auto& cluster = info.Topic->GetCluster(); - ui64 partition = info.Partition.Partition; - - auto converter = converterFactory->MakeDiscoveryConverter( - std::move(topic), {}, cluster, Request->GetDatabaseName().GetOrElse(TString()) - ); - return TPartitionId{converter, partition, id}; + auto GetAssignId = [](auto& request) { + if constexpr (UseMigrationProtocol) { + return request.assign_id(); + } else { + return request.partition_session_id(); + } }; - -#define GET_PART_ID_OR_EXIT(request) \ -auto partId = MakePartitionId(request); \ -if (!partId.DiscoveryConverter->IsValid()) { \ - CloseSession(TStringBuilder() << "Invalid topic in request: " << partId.DiscoveryConverter->GetOriginalTopic() \ - << ", reason: " << partId.DiscoveryConverter->GetReason(), \ - PersQueue::ErrorCode::BAD_REQUEST, ctx); \ - return; \ -} - if constexpr (UseMigrationProtocol) { switch (request.request_case()) { case TClientMessage::kInitRequest: { @@ -160,9 +136,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } case TClientMessage::kStatus: { - //const auto& req = request.status(); - GET_PART_ID_OR_EXIT(request.status()); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.status()))); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -176,8 +150,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } case TClientMessage::kReleased: { - GET_PART_ID_OR_EXIT(request.released()); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.released()))); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -193,8 +166,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ const ui64 commitOffset = req.commit_offset(); const bool verifyReadOffset = req.verify_read_offset(); - GET_PART_ID_OR_EXIT(request.start_read()); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, verifyReadOffset)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset)); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -257,8 +229,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ break; } case TClientMessage::kPartitionSessionStatusRequest: { - GET_PART_ID_OR_EXIT(request.partition_session_status_request()); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.partition_session_status_request()))); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -268,8 +239,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ } case TClientMessage::kStopPartitionSessionResponse: { - GET_PART_ID_OR_EXIT(request.stop_partition_session_response()); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId)); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.stop_partition_session_response()))); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -284,8 +254,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \ const ui64 readOffset = req.read_offset(); const ui64 commitOffset = req.commit_offset(); - GET_PART_ID_OR_EXIT(req); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, req.has_read_offset())); + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(req), readOffset, commitOffset, req.has_read_offset())); if (!Request->GetStreamCtx()->Read()) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); Die(ctx); @@ -337,8 +306,6 @@ if (!partId.DiscoveryConverter->IsValid()) { \ } } -#undef GET_PART_ID_OR_EXIT - template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) { @@ -464,19 +431,15 @@ template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; - auto it = Partitions.find(ev->Get()->Partition.AssignId); - if (it == Partitions.end()) { - return; - } - + auto it = Partitions.find(ev->Get()->AssignId); if (it == Partitions.end() || it->second.Releasing) { //do nothing - already released partition - LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for " << ev->Get()->Partition + LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for partition with assign id " << ev->Get()->AssignId << " at offset " << ev->Get()->ReadOffset); return; } LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for " - << ev->Get()->Partition << + << it->second.Partition << " at readOffset " << ev->Get()->ReadOffset << " commitOffset " << ev->Get()->CommitOffset); @@ -489,7 +452,7 @@ template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; - auto it = Partitions.find(ev->Get()->Partition.AssignId); + auto it = Partitions.find(ev->Get()->AssignId); if (it == Partitions.end()) { return; } @@ -499,19 +462,19 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP } Y_VERIFY(it->second.LockSent); - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for " << ev->Get()->Partition); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for partition " << it->second.Partition); ReleasePartition(it, true, ctx); } template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) { - auto it = Partitions.find(ev->Get()->Partition.AssignId); + auto it = Partitions.find(ev->Get()->AssignId); if (it == Partitions.end() || it->second.Releasing) { // Ignore request - client asking status after releasing of partition. return; } - ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->Partition)); + ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->AssignId)); } template<bool UseMigrationProtocol> diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index fe9e4011be2..7597d3c643d 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -242,6 +242,18 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(resp.assigned().partition() == 0); assignId = resp.assigned().assign_id(); + + req.Clear(); + req.mutable_start_read()->mutable_topic()->set_path("acc/topic1"); + req.mutable_start_read()->set_cluster("dc1"); + req.mutable_start_read()->set_partition(0); + req.mutable_start_read()->set_assign_id(354235); // invalid id should receive no reaction + + req.mutable_start_read()->set_read_offset(10); + UNIT_ASSERT_C(readStream->Write(req), "write fail"); + + Sleep(TDuration::MilliSeconds(100)); + req.Clear(); req.mutable_start_read()->mutable_topic()->set_path("acc/topic1"); req.mutable_start_read()->set_cluster("dc1"); @@ -409,6 +421,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT((partition_ids == TVector<i64>{0, 1})); assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + + req.Clear(); + + // invalid id should receive no reaction + req.mutable_start_partition_session_response()->set_partition_session_id(1124134); + + UNIT_ASSERT_C(readStream->Write(req), "write fail"); + + Sleep(TDuration::MilliSeconds(100)); + req.Clear(); req.mutable_start_partition_session_response()->set_partition_session_id(assignId); |