summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2022-08-03 20:06:34 +0300
committerildar-khisam <[email protected]>2022-08-03 20:06:34 +0300
commit58a4673f45222b6c60584270ad2fada532dabb88 (patch)
tree3ed59a6a51cad52db6f771de7d7c91f28aae0c44
parent6e0b86fc8b39114155339a1677dacb695b405baa (diff)
topic service partition id hotfix
-rw-r--r--ydb/services/persqueue_v1/actors/events.h18
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp75
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp22
3 files changed, 50 insertions, 65 deletions
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index 1837044a763..b986af3dfb5 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -295,15 +295,15 @@ struct TEvPQProxy {
};
struct TEvStartRead : public NActors::TEventLocal<TEvStartRead, EvStartRead> {
- TEvStartRead(const TPartitionId& partition, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset)
- : Partition(partition)
+ TEvStartRead(ui64 id, ui64 readOffset, ui64 commitOffset, bool verifyReadOffset)
+ : AssignId(id)
, ReadOffset(readOffset)
, CommitOffset(commitOffset)
, VerifyReadOffset(verifyReadOffset)
, Generation(0)
{ }
- const TPartitionId Partition;
+ const ui64 AssignId;
ui64 ReadOffset;
ui64 CommitOffset;
bool VerifyReadOffset;
@@ -311,19 +311,19 @@ struct TEvPQProxy {
};
struct TEvReleased : public NActors::TEventLocal<TEvReleased, EvReleased> {
- TEvReleased(const TPartitionId& partition)
- : Partition(partition)
+ TEvReleased(ui64 id)
+ : AssignId(id)
{ }
- const TPartitionId Partition;
+ const ui64 AssignId;
};
struct TEvGetStatus : public NActors::TEventLocal<TEvGetStatus, EvGetStatus> {
- TEvGetStatus(const TPartitionId& partition)
- : Partition(partition)
+ TEvGetStatus(ui64 id)
+ : AssignId(id)
{ }
- const TPartitionId Partition;
+ const ui64 AssignId;
};
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 3229faa2e6d..c1a3164b2d5 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -120,39 +120,15 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvDone());
return;
}
- auto converterFactory = TopicsHandler.GetConverterFactory();
- auto MakePartitionId = [&](auto& request) {
- const ui64 id = [&request](){
- if constexpr (UseMigrationProtocol) {
- return request.assign_id();
- } else {
- return request.partition_session_id();
- }
- }();
- Y_VERIFY(Partitions.find(id) != Partitions.end());
-
- const auto& info = Partitions.at(id);
- auto topic = info.Topic->GetFederationPath();
- const auto& cluster = info.Topic->GetCluster();
- ui64 partition = info.Partition.Partition;
-
- auto converter = converterFactory->MakeDiscoveryConverter(
- std::move(topic), {}, cluster, Request->GetDatabaseName().GetOrElse(TString())
- );
- return TPartitionId{converter, partition, id};
+ auto GetAssignId = [](auto& request) {
+ if constexpr (UseMigrationProtocol) {
+ return request.assign_id();
+ } else {
+ return request.partition_session_id();
+ }
};
-
-#define GET_PART_ID_OR_EXIT(request) \
-auto partId = MakePartitionId(request); \
-if (!partId.DiscoveryConverter->IsValid()) { \
- CloseSession(TStringBuilder() << "Invalid topic in request: " << partId.DiscoveryConverter->GetOriginalTopic() \
- << ", reason: " << partId.DiscoveryConverter->GetReason(), \
- PersQueue::ErrorCode::BAD_REQUEST, ctx); \
- return; \
-}
-
if constexpr (UseMigrationProtocol) {
switch (request.request_case()) {
case TClientMessage::kInitRequest: {
@@ -160,9 +136,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
case TClientMessage::kStatus: {
- //const auto& req = request.status();
- GET_PART_ID_OR_EXIT(request.status());
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.status())));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -176,8 +150,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
case TClientMessage::kReleased: {
- GET_PART_ID_OR_EXIT(request.released());
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.released())));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -193,8 +166,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
const ui64 commitOffset = req.commit_offset();
const bool verifyReadOffset = req.verify_read_offset();
- GET_PART_ID_OR_EXIT(request.start_read());
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, verifyReadOffset));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -257,8 +229,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
break;
}
case TClientMessage::kPartitionSessionStatusRequest: {
- GET_PART_ID_OR_EXIT(request.partition_session_status_request());
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(partId));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.partition_session_status_request())));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -268,8 +239,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
}
case TClientMessage::kStopPartitionSessionResponse: {
- GET_PART_ID_OR_EXIT(request.stop_partition_session_response());
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(partId));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.stop_partition_session_response())));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -284,8 +254,7 @@ if (!partId.DiscoveryConverter->IsValid()) { \
const ui64 readOffset = req.read_offset();
const ui64 commitOffset = req.commit_offset();
- GET_PART_ID_OR_EXIT(req);
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(partId, readOffset, commitOffset, req.has_read_offset()));
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(req), readOffset, commitOffset, req.has_read_offset()));
if (!Request->GetStreamCtx()->Read()) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
Die(ctx);
@@ -337,8 +306,6 @@ if (!partId.DiscoveryConverter->IsValid()) { \
}
}
-#undef GET_PART_ID_OR_EXIT
-
template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) {
@@ -464,19 +431,15 @@ template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
- auto it = Partitions.find(ev->Get()->Partition.AssignId);
- if (it == Partitions.end()) {
- return;
- }
-
+ auto it = Partitions.find(ev->Get()->AssignId);
if (it == Partitions.end() || it->second.Releasing) {
//do nothing - already released partition
- LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for " << ev->Get()->Partition
+ LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for partition with assign id " << ev->Get()->AssignId
<< " at offset " << ev->Get()->ReadOffset);
return;
}
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for "
- << ev->Get()->Partition <<
+ << it->second.Partition <<
" at readOffset " << ev->Get()->ReadOffset <<
" commitOffset " << ev->Get()->CommitOffset);
@@ -489,7 +452,7 @@ template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
- auto it = Partitions.find(ev->Get()->Partition.AssignId);
+ auto it = Partitions.find(ev->Get()->AssignId);
if (it == Partitions.end()) {
return;
}
@@ -499,19 +462,19 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP
}
Y_VERIFY(it->second.LockSent);
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for " << ev->Get()->Partition);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for partition " << it->second.Partition);
ReleasePartition(it, true, ctx);
}
template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) {
- auto it = Partitions.find(ev->Get()->Partition.AssignId);
+ auto it = Partitions.find(ev->Get()->AssignId);
if (it == Partitions.end() || it->second.Releasing) {
// Ignore request - client asking status after releasing of partition.
return;
}
- ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->Partition));
+ ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->AssignId));
}
template<bool UseMigrationProtocol>
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index fe9e4011be2..7597d3c643d 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -242,6 +242,18 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(resp.assigned().partition() == 0);
assignId = resp.assigned().assign_id();
+
+ req.Clear();
+ req.mutable_start_read()->mutable_topic()->set_path("acc/topic1");
+ req.mutable_start_read()->set_cluster("dc1");
+ req.mutable_start_read()->set_partition(0);
+ req.mutable_start_read()->set_assign_id(354235); // invalid id should receive no reaction
+
+ req.mutable_start_read()->set_read_offset(10);
+ UNIT_ASSERT_C(readStream->Write(req), "write fail");
+
+ Sleep(TDuration::MilliSeconds(100));
+
req.Clear();
req.mutable_start_read()->mutable_topic()->set_path("acc/topic1");
req.mutable_start_read()->set_cluster("dc1");
@@ -409,6 +421,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT((partition_ids == TVector<i64>{0, 1}));
assignId = resp.start_partition_session_request().partition_session().partition_session_id();
+
+ req.Clear();
+
+ // invalid id should receive no reaction
+ req.mutable_start_partition_session_response()->set_partition_session_id(1124134);
+
+ UNIT_ASSERT_C(readStream->Write(req), "write fail");
+
+ Sleep(TDuration::MilliSeconds(100));
+
req.Clear();
req.mutable_start_partition_session_response()->set_partition_session_id(assignId);