aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2022-09-07 16:06:00 +0300
committerildar-khisam <ikhis@ydb.tech>2022-09-07 16:06:00 +0300
commit319e48ac807bde14af6ed2f6a0c8f6e1e27851de (patch)
treee4aece3acc91c8338099c1050df8859c131e24e6
parent85752c1abb97c325c4e5e371303f506bd3e08124 (diff)
downloadydb-319e48ac807bde14af6ed2f6a0c8f6e1e27851de.tar.gz
support write sessions with explicit partition_id
support write sessions with explicit partition_id
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp27
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp8
2 files changed, 23 insertions, 12 deletions
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 84d7a7ee75b..36d2613676a 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -393,8 +393,16 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
}
} else {
// TODO (ildar-khisam@): support other cases of producer_id / message_group_id / partition_id settings
- if (InitRequest.message_group_id() != InitRequest.producer_id() || InitRequest.producer_id().empty()) {
- CloseSession("only (producer_id == message_group_id) case is supported for now", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ // For now exactly two scenarios supported:
+ // 1. Non-empty producer_id == message_group_id
+ // 2. Non-empty producer_id && non-empty valid partition_id (explicit partitioning)
+ bool isScenarioSupported = (!InitRequest.producer_id().empty() && InitRequest.has_message_group_id() &&
+ InitRequest.message_group_id() == InitRequest.producer_id())
+ || (!InitRequest.producer_id().empty() && InitRequest.has_partition_id());
+
+ if (!isScenarioSupported) {
+ CloseSession("unsupported producer_id / message_group_id / partition_id settings in init request",
+ PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
}
@@ -410,14 +418,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
PeerName = event->PeerName;
- SourceId = InitRequest.message_group_id();
- // SourceId = [this]() {
- // if constexpr (UseMigrationProtocol) {
- // return InitRequest.message_group_id();
- // } else {
- // return InitRequest.message_group_id().empty() ? InitRequest.producer_id() : InitRequest.message_group_id();
- // }
- // }();
+ SourceId = [this]() {
+ if constexpr (UseMigrationProtocol) {
+ return InitRequest.message_group_id();
+ } else {
+ return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
+ }
+ }();
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest << " from " << PeerName);
//TODO: get user agent from headers
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 4e50f86513d..eb0ebabd50d 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -856,7 +856,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_path("acc/topic3");
req.mutable_init_request()->set_producer_id("A");
- req.mutable_init_request()->set_message_group_id("A");
+ req.mutable_init_request()->set_partition_id(0);
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
@@ -864,6 +864,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse);
+ UNIT_ASSERT_C(resp.init_response().partition_id() == 0, "unexpected partition_id");
//send some reads
req.Clear();
@@ -893,7 +894,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.mutable_init_request()->set_path("acc/topic3");
req.mutable_init_request()->set_producer_id("B");
- req.mutable_init_request()->set_message_group_id("B");
+ req.mutable_init_request()->set_partition_id(1);
if (!writeStream2->Write(req)) {
ythrow yexception() << "write fail";
@@ -901,6 +902,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(writeStream2->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kInitResponse);
+ UNIT_ASSERT_C(resp.init_response().partition_id() == 1, "unexpected partition_id");
//send some reads
req.Clear();
@@ -957,6 +959,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
TVector<i64> partition_ids;
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Expect 1st start part, Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3");
partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());
@@ -970,6 +973,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
resp.Clear();
UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Expect 2nd start part, Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3");
partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());