aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-07-25 19:05:01 +0300
committerroot <root@qavm-2ed34686.qemu>2023-07-25 19:05:01 +0300
commit6780bb36eaf2c3d3fd68bc20ddc754969d90c5b8 (patch)
treea2a00142f303f150e2fe3abc192381e6c8009f8a
parente80ef95b850cc629524127e0bd25ebb6672f2ca8 (diff)
downloadydb-6780bb36eaf2c3d3fd68bc20ddc754969d90c5b8.tar.gz
Bugfix
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.cpp125
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h10
-rw-r--r--ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp40
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp17
4 files changed, 154 insertions, 38 deletions
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp
index 09b14ffdb8..04ab589e98 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.cpp
+++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp
@@ -115,10 +115,15 @@ TDiscoveryConverterPtr TDiscoveryConverter::ForFstClass(const TString& topic, co
return TDiscoveryConverterPtr(res);
}
+bool BasicNameChecks(const TStringBuf& name) {
+ return (!name.empty() && !name.Contains("//"));
+}
+
TDiscoveryConverterPtr TDiscoveryConverter::ForFederation(
const TString& topic, const TString& dc, const TString& localDc, const TString& database,
const TString& pqNormalizedPrefix
) {
+
auto* res = new TDiscoveryConverter();
res->PQPrefix = pqNormalizedPrefix;
res->FstClass = false;
@@ -126,9 +131,16 @@ TDiscoveryConverterPtr TDiscoveryConverter::ForFederation(
res->LocalDc = localDc;
TStringBuf topicBuf{topic};
TStringBuf dbBuf{database};
+ if (!BasicNameChecks(topicBuf)) {
+ res->Valid = false;
+ res->Reason = TStringBuilder() << "Bad topic name for federation: " << topic;
+ return NPersQueue::TDiscoveryConverterPtr(res);
+ }
+
topicBuf.SkipPrefix("/");
dbBuf.SkipPrefix("/");
dbBuf.ChopSuffix("/");
+
res->BuildForFederation(dbBuf, topicBuf);
return NPersQueue::TDiscoveryConverterPtr(res);
}
@@ -183,8 +195,9 @@ TDiscoveryConverter::TDiscoveryConverter(bool firstClass,
// No legacy names required;
OriginalTopic = pqTabletConfig.GetTopicPath();
BuildFstClassNames();
+ return;
} else {
- BuildForFederation(*Database, path); //, rootDatabases);
+ BuildForFederation(*Database, path);
}
}
@@ -192,6 +205,8 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
//, const TVector<TString>& rootDatabases
) {
topicPath.SkipPrefix("/");
+ CHECK_SET_VALID(!topicPath.empty(), "Invalid topic path (only account provided?)", return);
+ CHECK_SET_VALID(!topicPath.EndsWith("/"), "Invalid topic path 0 triling '/'", return);
if (FstClass) {
// No legacy names required;
OriginalTopic = topicPath;
@@ -216,6 +231,7 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
SkipPathPrefix(topicPath, databaseBuf);
Database = databaseBuf;
}
+ CHECK_SET_VALID(!topicPath.empty(), "Bad topic name (only account provided?)", return);
OriginalTopic = topicPath;
if (!isRootDb && Database.Defined()) {
@@ -225,28 +241,35 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
return;
}
if (!parsed) {
- ParseModernPath(topicPath);
+ if(!ParseModernPath(topicPath))
+ return;
}
- if (!Valid)
- return;
- Y_VERIFY_DEBUG(!FullModernName.empty());
+ CHECK_SET_VALID(
+ !FullModernName.empty(),
+ TStringBuilder() << "Internal error: Could not parse topic name (federation path was assumed)" << OriginalTopic,
+ return
+ );
+
PrimaryPath = NKikimr::JoinPath({*Database, FullModernName});
NormalizeAsFullPath(PrimaryPath);
if (!FullLegacyName.empty()) {
SecondaryPath = NKikimr::JoinPath({PQPrefix, FullLegacyName});
NormalizeAsFullPath(SecondaryPath.GetRef());
}
- BuildFromShortModernName();
+ if (!BuildFromShortModernName())
+ return;
} else {
if (root.empty()) {
root = PQPrefix;
}
// Topic from PQ root - this is either federation path (account/topic)
if (topicPath.find("/") != TString::npos) {
- BuildFromFederationPath(root);
+ auto ok = BuildFromFederationPath(root);
+ Y_UNUSED(ok);
} else {
// OR legacy name (rt3.sas--account--topic)
- BuildFromLegacyName(root); // Sets primary path;
+ auto ok = BuildFromLegacyName(root); // Sets primary path;
+ Y_UNUSED(ok);
}
}
}
@@ -280,24 +303,39 @@ void TDiscoveryConverter::BuildFstClassNames() {
NormalizeAsFullPath(PrimaryPath);
FullModernPath = PrimaryPath;
+ CHECK_SET_VALID(
+ !FullModernPath.empty(),
+ TStringBuilder() << "Internal error: could not build modern name for first class topic: " << OriginalTopic,
+ return;
+ );
};
-void TDiscoveryConverter::BuildFromFederationPath(const TString& rootPrefix) {
+bool TDiscoveryConverter::BuildFromFederationPath(const TString& rootPrefix) {
// This is federation (not first class) and we have topic specified as a path;
// So only convention supported is 'account/dir/topic' and account in path matches LB account;
TStringBuf topic(OriginalTopic);
LbPath = OriginalTopic;
TStringBuf fst, snd;
auto res = topic.TrySplit("/", fst, snd);
- Y_VERIFY(res);
+ CHECK_SET_VALID(res, TStringBuilder() << "Could not split federation path: " << OriginalTopic, return false);
Account_ = fst;
- ParseModernPath(snd);
- BuildFromShortModernName();
- Y_VERIFY_DEBUG(!FullLegacyName.empty());
+
+ if (!ParseModernPath(snd))
+ return false;
+ if (!BuildFromShortModernName()) {
+ return false;
+ }
+ CHECK_SET_VALID(
+ !FullLegacyName.empty(),
+ TStringBuilder() << "Internal error: couldn't build legacy-style name for topic " << OriginalTopic,
+ return false
+ );
+
PrimaryPath = NKikimr::JoinPath({rootPrefix, FullLegacyName});
NormalizeAsFullPath(PrimaryPath);
PendingDatabase = true;
+ return true;
}
bool TDiscoveryConverter::TryParseModernMirroredPath(TStringBuf path) {
@@ -316,12 +354,12 @@ bool TDiscoveryConverter::TryParseModernMirroredPath(TStringBuf path) {
FullModernName = path;
ModernName = fst;
if (Account_.Defined()) {
- BuildFromShortModernName();
+ return BuildFromShortModernName();
}
return true;
}
-void TDiscoveryConverter::ParseModernPath(const TStringBuf& path) {
+bool TDiscoveryConverter::ParseModernPath(const TStringBuf& path) {
// This is federation (not first class) and we have topic specified as a path;
// So only convention supported is 'account/dir/topic' and account in path matches LB account;
TStringBuilder pathAfterAccount;
@@ -338,15 +376,20 @@ void TDiscoveryConverter::ParseModernPath(const TStringBuf& path) {
} else {
pathAfterAccount << path;
}
+ CHECK_SET_VALID(BasicNameChecks(pathAfterAccount), "Bad topic name", return false);
ModernName = path;
FullModernName = pathAfterAccount;
if (Account_.Defined()) {
- BuildFromShortModernName();
+ return BuildFromShortModernName();
}
+ return true;
}
-void TDiscoveryConverter::BuildFromShortModernName() {
- Y_VERIFY(!ModernName.empty());
+bool TDiscoveryConverter::BuildFromShortModernName() {
+ CHECK_SET_VALID(
+ !ModernName.empty(), TStringBuilder() << "Could not parse topic name: " << OriginalTopic, return false
+ );
+
TStringBuf pathBuf(ModernName);
TStringBuilder legacyName;
TString legacyProducer;
@@ -381,15 +424,16 @@ void TDiscoveryConverter::BuildFromShortModernName() {
if (Dc.empty()) {
Dc = LocalDc;
CHECK_SET_VALID(!LocalDc.empty(), "Cannot determine DC: should specify either with Dc option or LocalDc option",
- return);
+ return false);
}
LbPath = lbPath;
FullLegacyName = TStringBuilder() << "rt3." << Dc << "--" << ShortLegacyName;
LegacyProducer = legacyProducer;
LegacyLogtype = logtype;
+ return true;
}
-void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool forceFullName) {
+bool TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool forceFullName) {
TStringBuf topic (OriginalTopic);
bool hasDcInName = topic.Contains("rt3.");
TStringBuf fst, snd;
@@ -399,13 +443,14 @@ void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
CHECK_SET_VALID(hasDcInName,
TStringBuilder() << "Invalid topic name - " << OriginalTopic
<< " - expected legacy-style name like rt3.<dc>--<account>--<topic>",
- return);
+ return false);
}
if (Dc.empty() && !hasDcInName) {
- Y_VERIFY(!FstClass);
+ CHECK_SET_VALID(!FstClass, TStringBuilder() << "Internal error: FirstClass mode enabled, but trying to parse Legacy-style name: "
+ << OriginalTopic, return false;);
CHECK_SET_VALID(!LocalDc.empty(),
"Cannot determine DC: should specify either in topic name, Dc option or LocalDc option",
- return);
+ return false);
Dc = LocalDc;
}
@@ -413,14 +458,15 @@ void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
if (hasDcInName) {
fullLegacyName = topic;
auto res = topic.SkipPrefix("rt3.");
- CHECK_SET_VALID(res, "Malformed full legacy topic name", return);
+ CHECK_SET_VALID(res, "Malformed full legacy topic name", return false);
res = topic.TrySplit("--", fst, snd);
- CHECK_SET_VALID(res, "Malformed legacy style topic name: contains 'rt3.', but no '--'.", return);
- CHECK_SET_VALID(Dc.empty() || Dc == fst, "DC specified both in topic name and separate option and they mismatch", return);
+ CHECK_SET_VALID(res, "Malformed legacy style topic name: contains 'rt3.', but no '--'.", return false);
+ CHECK_SET_VALID(Dc.empty() || Dc == fst, "DC specified both in topic name and separate option and they mismatch", return false);
Dc = fst;
topic = snd;
} else {
- Y_VERIFY(!Dc.empty());
+ CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC (despite beleiving the name contins one) for topic "
+ << OriginalTopic, return false;);
TStringBuilder builder;
builder << "rt3." << Dc << "--" << topic;
fullLegacyName = builder;
@@ -466,14 +512,17 @@ void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
topicName = topic;
}
modernName << topicName;
- Y_VERIFY(!Dc.empty());
+ CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC for topic: "
+ << OriginalTopic, return false);
+
bool isMirrored = (!LocalDc.empty() && Dc != LocalDc);
if (isMirrored) {
fullModernName << topicName << "-mirrored-from-" << Dc;
} else {
fullModernName << topicName;
}
- Y_VERIFY(!fullLegacyName.empty());
+ CHECK_SET_VALID(!fullLegacyName.empty(), TStringBuilder() << "Could not form a full legacy name for topic: "
+ << OriginalTopic, return false);
ShortLegacyName = shortLegacyName;
FullLegacyName = fullLegacyName;
@@ -489,6 +538,7 @@ void TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
} else {
SetDatabase("");
}
+ return true;
}
bool TDiscoveryConverter::IsValid() const {
@@ -597,7 +647,9 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
}
res->OriginalTopic = schemeName;
- res->BuildFromLegacyName(TString(normRoot), true);
+ auto buildOk = res->BuildFromLegacyName(TString(normRoot), true);
+ if (!buildOk)
+ return res;
if (res->Valid && !isLocal && res->Dc == localDc) {
res->Valid = false;
res->Reason = TStringBuilder() << "Topic '" << schemeName << "' created as non-local in local cluster";
@@ -642,7 +694,10 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
return res;
}
res->Dc = localDc;
- res->ParseModernPath(fullPath);
+ auto ok = res->ParseModernPath(fullPath);
+ if (!ok) {
+ return res;
+ }
} else {
if (!parsed) {
res->Valid = false;
@@ -652,7 +707,13 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
return res;
}
}
- Y_VERIFY(!res->FullModernName.empty());
+ if (res->FullModernName.empty()) {
+ res->Valid = false;
+ res->Reason = TStringBuilder() << "Internal error: FullModernName empty in TopicConverter(for schema) for topic: "
+ << schemeName;
+
+ return res;
+ }
res->PrimaryPath = NKikimr::JoinPath({*res->Database, res->FullModernName});
NormalizeAsFullPath(res->PrimaryPath);
}
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 1c1926444e..c2fc748ca2 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -57,8 +57,8 @@ class TDiscoveryConverter {
private:
void BuildForFederation(const TStringBuf& databaseBuf, TStringBuf topicPath);
void BuildFstClassNames();
- void BuildFromFederationPath(const TString& rootPrefix);
- void BuildFromShortModernName();
+ [[nodiscard]] bool BuildFromFederationPath(const TString& rootPrefix);
+ [[nodiscard]] bool BuildFromShortModernName();
protected:
TDiscoveryConverter() = default;
@@ -70,9 +70,9 @@ protected:
TDiscoveryConverter(bool firstClass, const TString& pqNormalizedPrefix,
const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride);
- void BuildFromLegacyName(const TString& rootPrefix, bool forceFullname = false);
- bool TryParseModernMirroredPath(TStringBuf path);
- void ParseModernPath(const TStringBuf& path);
+ [[nodiscard]] bool BuildFromLegacyName(const TString& rootPrefix, bool forceFullname = false);
+ [[nodiscard]] bool TryParseModernMirroredPath(TStringBuf path);
+ [[nodiscard]] bool ParseModernPath(const TStringBuf& path);
public:
bool IsValid() const;
diff --git a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
index 240f3da2c6..d96f26a3df 100644
--- a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
+++ b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
@@ -69,6 +69,7 @@ public:
Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
Y_UNIT_TEST(FullLegacyNames) {
+
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("rt3.dc1--account--topic", "", "");
UNIT_ASSERT_C(wrapper.DiscoveryConverter->IsValid(), wrapper.DiscoveryConverter->GetReason());
@@ -125,7 +126,9 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
Y_UNIT_TEST(AccountDatabase) {
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("account/topic", "", "");
+ UNIT_ASSERT_C(wrapper.DiscoveryConverter->IsValid(), wrapper.DiscoveryConverter->GetReason());
UNIT_ASSERT_VALUES_EQUAL(wrapper.GetAccount(), "account");
+
wrapper.SetDatabase("/database");
UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetSecondaryPath(""), "/database/topic");
@@ -172,8 +175,43 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
);
UNIT_ASSERT_VALUES_EQUAL(converter->GetFullModernName(), "account/account");
UNIT_ASSERT_VALUES_EQUAL(converter->GetSecondaryPath("account"), "/account/account/account");
+
+ converter = converterFactory.MakeDiscoveryConverter(
+ "account/", {}, "dc1", "account"
+ );
+ UNIT_ASSERT(!converter->IsValid());
+ UNIT_ASSERT(!converter->GetReason().Contains("Internal error"));
+ }
+ Y_UNIT_TEST(DiscoveryConverter) {
+ auto converterFactory = NPersQueue::TTopicNamesConverterFactory(
+ false, "/Root/PQ", "dc1", {}
+ );
+ TVector<TString> badNames = {"account//", "account/", "/"};
+ for (const auto& name : badNames) {
+ auto converter = converterFactory.MakeDiscoveryConverter(
+ name, {}, "dc1", ""
+ );
+ UNIT_ASSERT_C(!converter->IsValid(), name);
+ UNIT_ASSERT(!converter->GetReason().Contains("Internal error"));
}
+ }
+ Y_UNIT_TEST(EmptyModern) {
+ auto converterFactory = NPersQueue::TTopicNamesConverterFactory(
+ false, "/Root/PQ", "dc1", {}
+ );
+ auto converter = converterFactory.MakeDiscoveryConverter(
+ "account/", {}, "dc1", "account"
+ );
+ UNIT_ASSERT_VALUES_EQUAL(converter->IsValid(), false);
+
+ converter = converterFactory.MakeDiscoveryConverter(
+ "/account", {}, "dc1", ""
+ );
+ UNIT_ASSERT(converter->IsValid());
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetFullModernName(), "account");
+
+ }
Y_UNIT_TEST(FirstClass) {
TConverterTestWrapper wrapper(true, "", "");
@@ -411,4 +449,4 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
}
}
-} // NTests \ No newline at end of file
+} // NTests
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index d32271e93a..da8a148f91 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1601,6 +1601,23 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(res);
}
+ Y_UNIT_TEST(BadTopic) {
+ NPersQueue::TTestServer server;
+ server.AnnoyingClient->CreateTopic("rt3.dc1--topic", 1);
+
+ auto driver = server.AnnoyingClient->GetDriver();
+
+ auto writer = CreateSimpleWriter(*driver, "/topic/", "test source ID");
+ bool gotException = false;
+ try {
+ writer->GetInitSeqNo();
+ } catch(...) {
+ gotException = true;
+ }
+ UNIT_ASSERT(gotException);
+ }
+
+
Y_UNIT_TEST(SetupWriteSessionOnDisabledCluster) {
TPersQueueV1TestServer server;
SET_LOCALS;