aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-04-05 13:37:54 +0300
committeralexnick <alexnick@ydb.tech>2023-04-05 13:37:54 +0300
commit2ded29c1f43b56953cd49d1a4bc31c10a42bdd49 (patch)
tree5623870b3497a96dead41a835891407e068a12e7
parentddf13bed0be3a06ad5dce73428676112c0c49844 (diff)
downloadydb-2ded29c1f43b56953cd49d1a4bc31c10a42bdd49.tar.gz
fix for modern topic names
fix for database root
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp5
-rw-r--r--ydb/services/persqueue_v1/persqueue_compat_ut.cpp43
2 files changed, 28 insertions, 20 deletions
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 01395116458..6731ea50b4e 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1074,9 +1074,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_assigned()->set_read_offset(ev->Get()->Offset);
result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset);
} else {
- auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetRoot());
-
- if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || database == AppData(ctx)->PQConfig.GetRoot()) {
+ auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase());
+ if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || database == AppData(ctx)->PQConfig.GetDatabase() || database == AppData(ctx)->PQConfig.GetTestDatabaseRoot()) {
result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetFederationPathWithDC());
} else {
result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetModernName());
diff --git a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
index dc84826b8db..e42e787a20f 100644
--- a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
@@ -24,6 +24,9 @@ public:
DbPath = "/Root/LbCommunal/account";
Server->ServerSettings.PQConfig.MutablePQDiscoveryConfig()->SetLbUserDatabaseRoot(DbRoot);
Server->ServerSettings.PQConfig.SetTestDatabaseRoot(DbRoot);
+
+ Server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(false);
+
Server->StartServer();
//Server->EnableLogs()
Server->EnableLogs({ NKikimrServices::KQP_PROXY }, NActors::NLog::PRI_EMERG);
@@ -308,28 +311,34 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) {
UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic);
}
- for (auto topic : std::vector<TString> {"account/topic2", "account/topic2-mirrored-from-dc2"}) {
- grpc::ClientContext rcontext;
+ for (ui32 i = 0; i < 2; ++i) {
+ for (auto topic : std::vector<TString> {"account/topic2", "account/topic2-mirrored-from-dc2"}) {
+ grpc::ClientContext rcontext;
- auto readStream = TopicStubP_->StreamRead(&rcontext);
- UNIT_ASSERT(readStream);
- Ydb::Topic::StreamReadMessage::FromClient req;
- Ydb::Topic::StreamReadMessage::FromServer resp;
+ if (i > 0) {
+ rcontext.AddMetadata("x-ydb-database", "/Root/LbCommunal");
+ }
- req.mutable_init_request()->add_topics_read_settings()->set_path(topic);
+ auto readStream = TopicStubP_->StreamRead(&rcontext);
+ UNIT_ASSERT(readStream);
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
- req.mutable_init_request()->set_consumer("user");
+ req.mutable_init_request()->add_topics_read_settings()->set_path(topic);
- Cerr << "BEFORE PARSING " << topic << "\n";
+ req.mutable_init_request()->set_consumer("user");
- UNIT_ASSERT(readStream->Write(req));
- UNIT_ASSERT(readStream->Read(&resp));
- Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
- UNIT_ASSERT(readStream->Read(&resp));
- Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
- UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
- UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic);
+ Cerr << "BEFORE PARSING " << topic << "\n";
+
+ UNIT_ASSERT(readStream->Write(req));
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic);
+ }
}
}