summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <[email protected]>2022-07-11 16:12:22 +0300
committerkomels <[email protected]>2022-07-11 16:12:22 +0300
commitf0b01fc4537618a0687f3155ebbad1293c23a4a0 (patch)
tree1f84d4efc8e711b99cf9b77e61ad7a1ca643722b
parent78e5ca7e5aaf094dcaaacafc08856aedaef5a25f (diff)
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.cpp44
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h1
-rw-r--r--ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp46
3 files changed, 58 insertions, 33 deletions
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp
index 419383f3fd9..a7271ec043e 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.cpp
+++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp
@@ -32,6 +32,29 @@ namespace {
}
}
+void NormalizeAsFullPath(TString& path) {
+ if (!path.Empty() && !path.StartsWith("/")) {
+ path = TString("/") + path;
+ }
+}
+
+TString StripLeadSlash(const TString& path) {
+ if (!path.StartsWith("/")) {
+ return path;
+ } else {
+ return path.substr(1);
+ }
+}
+
+TString NormalizeFullPath(const TString& fullPath) {
+ if (!fullPath.Empty() && !fullPath.StartsWith("/")) {
+ return TString("/") + fullPath;
+ } else {
+ return fullPath;
+ }
+}
+
+
TString GetFullTopicPath(const NActors::TActorContext& ctx, const TMaybe<TString>& database, const TString& topicPath) {
if (NKikimr::AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
return FullPath(database, topicPath);
@@ -72,14 +95,6 @@ TString MakeConsumerPath(const TString& consumer) {
return res;
}
-TString NormalizeFullPath(const TString& fullPath) {
- if (fullPath.StartsWith("/"))
- return fullPath.substr(1);
- else {
- return fullPath;
- }
-}
-
TDiscoveryConverterPtr TDiscoveryConverter::ForFstClass(const TString& topic, const TString& database) {
auto* res = new TDiscoveryConverter();
res->FstClass = true;
@@ -206,8 +221,10 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
return;
Y_VERIFY_DEBUG(!FullModernName.empty());
PrimaryPath = NKikimr::JoinPath({*Database, FullModernName});
+ NormalizeAsFullPath(PrimaryPath);
if (!FullLegacyName.empty()) {
SecondaryPath = NKikimr::JoinPath({PQPrefix, FullLegacyName});
+ NormalizeAsFullPath(SecondaryPath.GetRef());
}
BuildFromShortModernName();
} else {
@@ -247,6 +264,8 @@ void TDiscoveryConverter::BuildFstClassNames() {
PrimaryPath = TString(normTopic);
Database = "";
}
+ NormalizeAsFullPath(PrimaryPath);
+
FullModernPath = PrimaryPath;
};
@@ -263,6 +282,7 @@ void TDiscoveryConverter::BuildFromFederationPath(const TString& rootPrefix) {
BuildFromShortModernName();
Y_VERIFY_DEBUG(!FullLegacyName.empty());
PrimaryPath = NKikimr::JoinPath({rootPrefix, FullLegacyName});
+ NormalizeAsFullPath(PrimaryPath);
PendingDatabase = true;
}
@@ -460,7 +480,7 @@ void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
ShortLegacyName = shortLegacyName;
FullLegacyName = fullLegacyName;
PrimaryPath = NKikimr::JoinPath({rootPrefix, fullLegacyName});
-
+ NormalizeAsFullPath(PrimaryPath);
FullModernName = fullModernName;
ModernName = modernName;
LbPath = NKikimr::JoinPath({*Account_, modernName});
@@ -528,6 +548,7 @@ void TDiscoveryConverter::SetDatabase(const TString& database) {
Y_VERIFY(!FullModernName.empty());
if (!SecondaryPath.Defined()) {
SecondaryPath = NKikimr::JoinPath({*Database, FullModernName});
+ NormalizeAsFullPath(SecondaryPath.GetRef());
}
FullModernPath = SecondaryPath.GetRef();
PendingDatabase = false;
@@ -635,6 +656,7 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
}
Y_VERIFY(!res->FullModernName.empty());
res->PrimaryPath = NKikimr::JoinPath({*res->Database, res->FullModernName});
+ NormalizeAsFullPath(res->PrimaryPath);
}
if (res->IsValid()) {
Y_VERIFY(res->Account_.Defined());
@@ -761,7 +783,7 @@ TString TTopicNameConverter::GetTopicForSrcId() const {
if (!IsValid())
return {};
if (FstClass) {
- return FullModernPath;
+ return StripLeadSlash(FullModernPath);
} else {
return GetClientsideName();
}
@@ -771,7 +793,7 @@ TString TTopicNameConverter::GetTopicForSrcIdHash() const {
if (!IsValid())
return {};
if (FstClass) {
- return FullModernPath;
+ return StripLeadSlash(FullModernPath);
} else {
return ShortLegacyName;
}
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index c7e5b16f9fb..559bf4f110b 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -381,6 +381,7 @@ public:
};
TString NormalizeFullPath(const TString& fullPath);
+TString StripLeadSlash(const TString& path);
} // namespace NPersQueue
diff --git a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
index 368949b8c23..d116f071151 100644
--- a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
+++ b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetShortLegacyName(), "account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetFullLegacyName(), "rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetDc(), "dc1");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc1--account--topic");
wrapper.SetConverter("account--topic", "", "");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetFullLegacyName(), "rt3.dc1--account--topic");
@@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
Y_UNIT_TEST(FullLegacyPath) {
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("/Root/PQ/rt3.dc1--account--topic", "", "/Root");
- //UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");
+ //UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "/Root/PQ/rt3.dc1--account--topic");
}
Y_UNIT_TEST(MinimalName) {
@@ -96,30 +96,30 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
wrapper.SetConverter("rt3.dc1--topic", "", "");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetShortLegacyName(), "topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetFullLegacyName(), "rt3.dc1--topic");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc1--topic");
wrapper.SetConverter("topic", "", "");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetFullLegacyName(), "rt3.dc1--topic");
wrapper.SetConverter("topic", "dc2", "");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetFullLegacyName(), "rt3.dc2--topic");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc2--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc2--topic");
}
Y_UNIT_TEST(FullLegacyNamesWithRootDatabase) {
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("rt3.dc1--account--topic", "", "/Root");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc1--account--topic");
}
Y_UNIT_TEST(WithLogbrokerPath) {
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("account/topic", "", "");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetAccount(), "account");
wrapper.SetConverter("account/topic", "dc2", "/Root");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "Root/PQ/rt3.dc2--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc2--account--topic");
}
Y_UNIT_TEST(AccountDatabase) {
@@ -127,10 +127,10 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
wrapper.SetConverter("account/topic", "", "");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetAccount(), "account");
wrapper.SetDatabase("/database");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetSecondaryPath(""), "database/topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetSecondaryPath(""), "/database/topic");
wrapper.SetConverter("account2/topic2", "", "");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetSecondaryPath("database2"), "database2/topic2");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetSecondaryPath("database2"), "/database2/topic2");
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetAccount(), "account2");
wrapper.SetConverter("rt3.dc1--account3--topic3", "", "");
@@ -145,13 +145,13 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
TConverterTestWrapper wrapper(true, "", "");
wrapper.SetConverter("account/stream", "", "/database");
wrapper.BasicFirstClassChecks();
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "database/account/stream");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/database/account/stream");
wrapper.SetConverter("/somedb/account/stream", "", "/somedb");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "somedb/account/stream");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/somedb/account/stream");
wrapper.SetConverter("/somedb2/account/stream", "", "");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "somedb2/account/stream");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetPrimaryPath(), "/somedb2/account/stream");
}
}
@@ -174,7 +174,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "topic");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "/Root/PQ/rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account--topic");
}
@@ -193,8 +193,8 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
wrapper.SetConverter(pqConfig);
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "lb/account-database/path/topic");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetSecondaryPath(), "Root/PQ/rt3.dc1--account@path--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "/lb/account-database/path/topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetSecondaryPath(), "/Root/PQ/rt3.dc1--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic");
@@ -211,12 +211,14 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
wrapper.SetConverter(pqConfig);
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "lb/account-database/path/.topic/mirrored-from-dc2");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetSecondaryPath(), "Root/PQ/rt3.dc2--account@path--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "/lb/account-database/path/.topic/mirrored-from-dc2");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetSecondaryPath(), "/Root/PQ/rt3.dc2--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/.topic/mirrored-from-dc2");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc2--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc2--account@path--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcId(), "rt3.dc2--account@path--topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "account@path--topic");
}
}
@@ -235,7 +237,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "lb/database/my-stream");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "my-stream");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "my-stream");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "lb/database/my-stream");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "/lb/database/my-stream");
}
}
@@ -247,7 +249,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetCluster(), "sas");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
- UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "/Root/PQ/rt3.sas--account--topic");
UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account--topic");
converter = TTopicNameConverter::ForFederation("/Root/PQ", "", "rt3.sas--account@dir--topic", "/Root/PQ", "/Root", false);
@@ -256,7 +258,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetCluster(), "sas");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account@dir");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
- UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account@dir--topic");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "/Root/PQ/rt3.sas--account@dir--topic");
UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account@dir--topic");
}
@@ -278,7 +280,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetCluster(), "sas");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
- UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "LbCommunal/account/topic");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "/LbCommunal/account/topic");
converter = TTopicNameConverter::ForFederation(
"/Root/PQ", "", "mirrored-from-sas", "/LbCommunal/account/dir/.topic", "/LbCommunal/account", false, "", "account"
@@ -288,7 +290,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetCluster(), "sas");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account@dir");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
- UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "LbCommunal/account/dir/.topic/mirrored-from-sas");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "/LbCommunal/account/dir/.topic/mirrored-from-sas");
}
Y_UNIT_TEST(BadModernTopics) {