diff options
author | alexnick <alexnick@ydb.tech> | 2023-04-05 13:37:54 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-04-05 13:37:54 +0300 |
commit | 2ded29c1f43b56953cd49d1a4bc31c10a42bdd49 (patch) | |
tree | 5623870b3497a96dead41a835891407e068a12e7 | |
parent | ddf13bed0be3a06ad5dce73428676112c0c49844 (diff) | |
download | ydb-2ded29c1f43b56953cd49d1a4bc31c10a42bdd49.tar.gz |
fix for modern topic names
fix for database root
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 5 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_compat_ut.cpp | 43 |
2 files changed, 28 insertions, 20 deletions
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 01395116458..6731ea50b4e 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -1074,9 +1074,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_assigned()->set_read_offset(ev->Get()->Offset); result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset); } else { - auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetRoot()); - - if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || database == AppData(ctx)->PQConfig.GetRoot()) { + auto database = Request->GetDatabaseName().GetOrElse(AppData(ctx)->PQConfig.GetDatabase()); + if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || database == AppData(ctx)->PQConfig.GetDatabase() || database == AppData(ctx)->PQConfig.GetTestDatabaseRoot()) { result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetFederationPathWithDC()); } else { result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetModernName()); diff --git a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp index dc84826b8db..e42e787a20f 100644 --- a/ydb/services/persqueue_v1/persqueue_compat_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_compat_ut.cpp @@ -24,6 +24,9 @@ public: DbPath = "/Root/LbCommunal/account"; Server->ServerSettings.PQConfig.MutablePQDiscoveryConfig()->SetLbUserDatabaseRoot(DbRoot); Server->ServerSettings.PQConfig.SetTestDatabaseRoot(DbRoot); + + Server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(false); + Server->StartServer(); //Server->EnableLogs() Server->EnableLogs({ NKikimrServices::KQP_PROXY }, NActors::NLog::PRI_EMERG); @@ -308,28 +311,34 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) { UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic); } - for (auto topic : std::vector<TString> {"account/topic2", "account/topic2-mirrored-from-dc2"}) { - grpc::ClientContext rcontext; + for (ui32 i = 0; i < 2; ++i) { + for (auto topic : std::vector<TString> {"account/topic2", "account/topic2-mirrored-from-dc2"}) { + grpc::ClientContext rcontext; - auto readStream = TopicStubP_->StreamRead(&rcontext); - UNIT_ASSERT(readStream); - Ydb::Topic::StreamReadMessage::FromClient req; - Ydb::Topic::StreamReadMessage::FromServer resp; + if (i > 0) { + rcontext.AddMetadata("x-ydb-database", "/Root/LbCommunal"); + } - req.mutable_init_request()->add_topics_read_settings()->set_path(topic); + auto readStream = TopicStubP_->StreamRead(&rcontext); + UNIT_ASSERT(readStream); + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; - req.mutable_init_request()->set_consumer("user"); + req.mutable_init_request()->add_topics_read_settings()->set_path(topic); - Cerr << "BEFORE PARSING " << topic << "\n"; + req.mutable_init_request()->set_consumer("user"); - UNIT_ASSERT(readStream->Write(req)); - UNIT_ASSERT(readStream->Read(&resp)); - Cerr << "===Got response: " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); - UNIT_ASSERT(readStream->Read(&resp)); - Cerr << "===Got response: " << resp.ShortDebugString() << Endl; - UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); - UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic); + Cerr << "BEFORE PARSING " << topic << "\n"; + + UNIT_ASSERT(readStream->Write(req)); + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == topic); + } } } |