diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-09-07 16:06:00 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-09-07 16:06:00 +0300 |
commit | 319e48ac807bde14af6ed2f6a0c8f6e1e27851de (patch) | |
tree | e4aece3acc91c8338099c1050df8859c131e24e6 | |
parent | 85752c1abb97c325c4e5e371303f506bd3e08124 (diff) | |
download | ydb-319e48ac807bde14af6ed2f6a0c8f6e1e27851de.tar.gz |
support write sessions with explicit partition_id
support write sessions with explicit partition_id
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 27 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 8 |
2 files changed, 23 insertions, 12 deletions
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 84d7a7ee75b..36d2613676a 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -393,8 +393,16 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt } } else { // TODO (ildar-khisam@): support other cases of producer_id / message_group_id / partition_id settings - if (InitRequest.message_group_id() != InitRequest.producer_id() || InitRequest.producer_id().empty()) { - CloseSession("only (producer_id == message_group_id) case is supported for now", PersQueue::ErrorCode::BAD_REQUEST, ctx); + // For now exactly two scenarios supported: + // 1. Non-empty producer_id == message_group_id + // 2. Non-empty producer_id && non-empty valid partition_id (explicit partitioning) + bool isScenarioSupported = (!InitRequest.producer_id().empty() && InitRequest.has_message_group_id() && + InitRequest.message_group_id() == InitRequest.producer_id()) + || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id()); + + if (!isScenarioSupported) { + CloseSession("unsupported producer_id / message_group_id / partition_id settings in init request", + PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } } @@ -410,14 +418,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt PeerName = event->PeerName; - SourceId = InitRequest.message_group_id(); - // SourceId = [this]() { - // if constexpr (UseMigrationProtocol) { - // return InitRequest.message_group_id(); - // } else { - // return InitRequest.message_group_id().empty() ? InitRequest.producer_id() : InitRequest.message_group_id(); - // } - // }(); + SourceId = [this]() { + if constexpr (UseMigrationProtocol) { + return InitRequest.message_group_id(); + } else { + return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id(); + } + }(); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest << " from " << PeerName); //TODO: get user agent from headers diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 4e50f86513d..eb0ebabd50d 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -856,7 +856,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_path("acc/topic3"); req.mutable_init_request()->set_producer_id("A"); - req.mutable_init_request()->set_message_group_id("A"); + req.mutable_init_request()->set_partition_id(0); if (!writeStream1->Write(req)) { ythrow yexception() << "write fail"; @@ -864,6 +864,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(writeStream1->Read(&resp)); Cerr << "===Got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse); + UNIT_ASSERT_C(resp.init_response().partition_id() == 0, "unexpected partition_id"); //send some reads req.Clear(); @@ -893,7 +894,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.mutable_init_request()->set_path("acc/topic3"); req.mutable_init_request()->set_producer_id("B"); - req.mutable_init_request()->set_message_group_id("B"); + req.mutable_init_request()->set_partition_id(1); if (!writeStream2->Write(req)) { ythrow yexception() << "write fail"; @@ -901,6 +902,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(writeStream2->Read(&resp)); Cerr << "===Got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse); + UNIT_ASSERT_C(resp.init_response().partition_id() == 1, "unexpected partition_id"); //send some reads req.Clear(); @@ -957,6 +959,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { TVector<i64> partition_ids; //lock partition UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Expect 1st start part, Got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3"); partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); @@ -970,6 +973,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { resp.Clear(); UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Expect 2nd start part, Got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3"); partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); |