aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-06-23 15:43:04 +0300
committerkomels <komels@yandex-team.ru>2022-06-23 15:43:04 +0300
commit439ee34959ae6a27dc5ae8cc654804ef0d7eb280 (patch)
tree5c19190d0ee730644236788319ad6026ce7e09c4
parent8a5992c0672644d86d714b38434563201eab91a9 (diff)
downloadydb-439ee34959ae6a27dc5ae8cc654804ef0d7eb280.tar.gz
Refactor converters
Fix for LOGBROKER-7559 ref:0a5eeec92874536d09c3da5cd5c2f1ae8cc214ae
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.cpp50
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h32
-rw-r--r--ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp14
-rw-r--r--ydb/services/lib/actors/type_definitions.h2
-rw-r--r--ydb/services/persqueue_v1/actors/events.h4
-rw-r--r--ydb/services/persqueue_v1/actors/partition_id.h4
-rw-r--r--ydb/services/persqueue_v1/actors/read_info_actor.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/read_info_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp19
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp40
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp2
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.cpp10
12 files changed, 108 insertions, 73 deletions
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp
index e6c181c1d36..419383f3fd9 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.cpp
+++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp
@@ -191,19 +191,7 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
topicPath.SkipPrefix("/");
Database = databaseBuf;
}
-/*
- if (!isRootDb) {
- for (auto& rootDb: rootDatabases) {
- TStringBuf rootBuf(rootDb);
- if (!databaseBuf.empty() && IsPathPrefix(databaseBuf, rootBuf) || IsPathPrefix(topicPath, rootBuf)) {
- isRootDb = true;
- root = rootDb;
- topicPath.SkipPrefix(rootBuf);
- break;
- }
- }
- }
-*/
+
OriginalTopic = topicPath;
if (!isRootDb && Database.Defined()) {
// Topic with valid non-root database. Parse as 'modern' name. Primary path is path in database.
@@ -239,9 +227,8 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr
TTopicConverterPtr TDiscoveryConverter::UpgradeToFullConverter(
const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride
) {
-
+ Y_VERIFY_S(Valid, Reason.c_str());
auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, ydbDatabaseRootOverride);
- res->InternalName = InternalName;
return TTopicConverterPtr(res);
}
@@ -510,6 +497,14 @@ TString TDiscoveryConverter::GetPrimaryPath() const {
CHECK_VALID_AND_RETURN(PrimaryPath);
}
+TString TDiscoveryConverter::GetOriginalPath() const {
+ if (!OriginalPath.empty()) {
+ return OriginalPath;
+ } else {
+ return GetPrimaryPath();
+ }
+}
+
const TMaybe<TString>& TDiscoveryConverter::GetSecondaryPath(const TString& database) {
if (!database.empty()) {
SetDatabase(database);
@@ -538,13 +533,6 @@ void TDiscoveryConverter::SetDatabase(const TString& database) {
PendingDatabase = false;
}
-TString TDiscoveryConverter::GetInternalName() const {
- if (InternalName.empty()) {
- return GetPrimaryPath();
- }
- CHECK_VALID_AND_RETURN(InternalName);
-}
-
const TString& TDiscoveryConverter::GetOriginalTopic() const {
return OriginalTopic;
}
@@ -655,6 +643,7 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
Y_VERIFY(!res->Dc.empty());
Y_VERIFY(!res->FullLegacyName.empty());
res->Account = *res->Account_;
+ res->InternalName = res->FullLegacyName;
}
return res;
}
@@ -702,6 +691,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi
ClientsideName = path;
ShortClientsideName = path;
FullModernName = path;
+ InternalName = PrimaryPath;
} else {
SetDatabase(*Database);
Y_VERIFY(!FullLegacyName.empty());
@@ -716,6 +706,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi
LegacyProducer = Account;
}
Y_VERIFY(!FullModernName.empty());
+ InternalName = FullLegacyName;
}
}
@@ -730,15 +721,13 @@ const TString& TTopicNameConverter::GetModernName() const {
TString TTopicNameConverter::GetShortLegacyName() const {
CHECK_VALID_AND_RETURN(ShortLegacyName);
}
-//
-//TString TTopicNameConverter::GetFullLegacyName() const {
-// if (FstClass) {
-// return TString("");
-// }
-// CHECK_VALID_AND_RETURN(FullLegacyName);
-//}
+
+TString TTopicNameConverter::GetInternalName() const {
+ CHECK_VALID_AND_RETURN(InternalName);
+}
const TString& TTopicNameConverter::GetClientsideName() const {
+ Y_VERIFY_S(Valid, Reason.c_str());
Y_VERIFY(!ClientsideName.empty());
return ClientsideName;
}
@@ -789,6 +778,7 @@ TString TTopicNameConverter::GetTopicForSrcIdHash() const {
}
TString TTopicNameConverter::GetSecondaryPath() const {
+ Y_VERIFY_S(Valid, Reason.c_str());
if (!FstClass) {
Y_VERIFY(SecondaryPath.Defined());
return *SecondaryPath;
@@ -829,7 +819,7 @@ TTopicsToConverter TTopicsListController::GetReadTopicsList(
<< "': " << converter->GetReason();
return;
}
- result.Topics[converter->GetInternalName()] = converter;
+ result.Topics[converter->GetOriginalPath()] = converter;
result.ClientTopics[topic].push_back(converter);
};
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 14881cfbd32..c7e5b16f9fb 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -35,7 +35,7 @@ TString MakeConsumerPath(const TString& consumer);
if (!Valid) { \
return TString(); \
} else { \
- Y_VERIFY(!result.empty()); \
+ Y_VERIFY_S(!result.empty(), OriginalTopic.c_str()); \
return result; \
}
@@ -96,6 +96,8 @@ public:
*/
TString GetPrimaryPath() const;
+ TString GetOriginalPath() const;
+
/** Second path if applicable:
* Nothing for first class
* Nothing for topic with explicitly specified non-root Database
@@ -107,22 +109,11 @@ public:
*/
const TMaybe<TString>& GetSecondaryPath(const TString& databasePath);
- /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings.
- * Single topic in any representation is supposed to have same internal name
- * DO NOT use for business logic, such as sensors, SourceIds, etc
- *
- * /user-database/dir/my-topic
- * OR
- * /user-database/dir/.my-topic/mirrored-from-dc
- *
- * */
- TString GetInternalName() const;
-
/**
Only for special cases.
*/
void SetPrimaryPath(const TString& path) {
- InternalName = PrimaryPath;
+ OriginalPath = PrimaryPath;
PrimaryPath = path;
}
@@ -148,8 +139,8 @@ protected:
TString PQPrefix;
//TVector<TString> RootDatabases;
+ TString OriginalPath;
TString PrimaryPath;
- TString InternalName;
bool PendingDatabase = false;
TMaybe<TString> SecondaryPath;
TMaybe<TString> Account_;
@@ -236,6 +227,18 @@ public:
* */
const TString& GetModernName() const;
+ /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings.
+ * Single topic in any representation is supposed to have same internal name
+ * DO NOT use for business logic, such as sensors, SourceIds, etc
+ *
+ * /user-database/dir/my-topic
+ * OR
+ * /user-database/dir/.my-topic/mirrored-from-dc
+ *
+ * */
+ TString GetInternalName() const;
+
+
TString GetTopicForSrcId() const;
TString GetTopicForSrcIdHash() const;
@@ -251,6 +254,7 @@ private:
TString ClientsideName;
TString ShortClientsideName;
TString Account;
+ TString InternalName;
};
using TDiscoveryConverterPtr = std::shared_ptr<TDiscoveryConverter>;
diff --git a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
index b426096c93d..368949b8c23 100644
--- a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
+++ b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp
@@ -25,7 +25,7 @@ public:
);
UNIT_ASSERT(!DiscoveryConverter->FullLegacyName.empty());
UNIT_ASSERT(!DiscoveryConverter->ShortLegacyName.empty());
- UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());
+ //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());
}
void BasicFirstClassChecks() {
@@ -35,7 +35,7 @@ public:
DiscoveryConverter->FullModernPath
);
- UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());
+ //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());
}
void SetDatabase(const TString& database) {
@@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
Y_UNIT_TEST(FullLegacyPath) {
TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));
wrapper.SetConverter("/Root/PQ/rt3.dc1--account--topic", "", "/Root");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");
+ //UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");
}
Y_UNIT_TEST(MinimalName) {
@@ -171,12 +171,12 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetCluster(), "dc1");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcId(), "rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "account--topic");
- UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account--topic");
}
Y_UNIT_TEST(Paths) {
@@ -198,6 +198,8 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account@path--topic");
+
}
{
NKikimrPQ::TPQTabletConfig pqConfig;
@@ -214,6 +216,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/.topic/mirrored-from-dc2");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc2--account@path--topic");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc2--account@path--topic");
}
}
@@ -232,6 +235,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "lb/database/my-stream");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "my-stream");
UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "my-stream");
+ UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "lb/database/my-stream");
}
}
@@ -244,6 +248,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account--topic");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account--topic");
converter = TTopicNameConverter::ForFederation("/Root/PQ", "", "rt3.sas--account@dir--topic", "/Root/PQ", "/Root", false);
UNIT_ASSERT_C(converter->IsValid(), converter->GetReason());
@@ -252,6 +257,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account@dir");
UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");
UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account@dir--topic");
+ UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account@dir--topic");
}
Y_UNIT_TEST(BadLegacyTopics) {
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index 4708cf842ea..e13e5ff9d82 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -35,6 +35,6 @@ namespace NKikimr::NGRpcProxy {
TString FolderId;
};
- using TTopicTabletsPairs = TVector<TTopicInitInfo>;
+ using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
} // namespace NKikimr::NGRpcProxy
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index 8b7a1dbd0e3..ee8ddb7872a 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -71,11 +71,11 @@ struct TEvPQProxy {
struct TEvAuthResultOk : public NActors::TEventLocal<TEvAuthResultOk, EvAuthResultOk> {
- TEvAuthResultOk(const TTopicTabletsPairs&& topicAndTablets)
+ TEvAuthResultOk(const TTopicInitInfoMap&& topicAndTablets)
: TopicAndTablets(std::move(topicAndTablets))
{ }
- TTopicTabletsPairs TopicAndTablets;
+ TTopicInitInfoMap TopicAndTablets;
};
struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> {
diff --git a/ydb/services/persqueue_v1/actors/partition_id.h b/ydb/services/persqueue_v1/actors/partition_id.h
index 2f5af8673c3..64bd77be4a2 100644
--- a/ydb/services/persqueue_v1/actors/partition_id.h
+++ b/ydb/services/persqueue_v1/actors/partition_id.h
@@ -16,8 +16,8 @@ struct TPartitionId {
ui64 AssignId;
bool operator < (const TPartitionId& rhs) const {
- return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetInternalName()) <
- std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetInternalName());
+ return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetOriginalPath()) <
+ std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetOriginalPath());
}
};
diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.cpp b/ydb/services/persqueue_v1/actors/read_info_actor.cpp
index 109debd32ba..12c63873f0a 100644
--- a/ydb/services/persqueue_v1/actors/read_info_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_info_actor.cpp
@@ -98,7 +98,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC
NKikimrClient::TPersQueueRequest proto;
proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->SetClientId(ClientId);
- for (auto& t : TopicAndTablets) {
+ for (auto& [_, t] : TopicAndTablets) {
proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->AddTopic(t.TopicNameConverter->GetClientsideName());
}
diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.h b/ydb/services/persqueue_v1/actors/read_info_actor.h
index 307153a9ed7..3ea8360d483 100644
--- a/ydb/services/persqueue_v1/actors/read_info_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_info_actor.h
@@ -60,7 +60,7 @@ private:
TActorId AuthInitActor;
- TTopicTabletsPairs TopicAndTablets;
+ TTopicInitInfoMap TopicAndTablets;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index d5f0dc3da10..ba707b095d0 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -93,9 +93,16 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId();
topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId();
topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId();
+ if (!topicsIter->second.DiscoveryConverter->IsValid()) {
+ TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503",
+ topicsIter->second.DiscoveryConverter->GetPrintableString().c_str());
+ CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
+ return false;
+ }
topicsIter->second.FullConverter = topicsIter->second.DiscoveryConverter->UpgradeToFullConverter(
pqDescr.GetPQTabletConfig(), AppData(ctx)->PQConfig.GetTestDatabaseRoot()
);
+ Y_VERIFY(topicsIter->second.FullConverter->IsValid());
return CheckTopicACL(entry, topicsIter->first, ctx);
}
@@ -107,7 +114,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
auto i = 0u;
auto& topicsRequested = ev->Get()->TopicsRequested;
for (const auto& entry : ev->Get()->Result->ResultSet) {
- const auto& path = topicsRequested[i++]->GetInternalName();
+ const auto& path = topicsRequested[i++]->GetOriginalPath();
auto it = Topics.find(path);
Y_VERIFY(it != Topics.end());
@@ -116,8 +123,8 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
const auto& topic = entry.ListNodeEntry->Children.at(0);
it->second.DiscoveryConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name)));
- Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second;
- Topics.erase(it);
+// Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second;
+// Topics.erase(it);
reDescribe = true;
continue;
@@ -241,9 +248,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate(
void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {
- TTopicTabletsPairs res;
- for (auto& [_, holder] : Topics) {
- res.emplace_back(decltype(res)::value_type({
+ TTopicInitInfoMap res;
+ for (auto& [name, holder] : Topics) {
+ res.insert(std::make_pair(name, TTopicInitInfo{
holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId
}));
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index a329268ae85..88e2c99c78b 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -367,7 +367,7 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
if (!p.second.Released) {
// ToDo[counters]
- auto it = TopicCounters.find(p.second.Partition.DiscoveryConverter->GetInternalName());
+ auto it = TopicCounters.find(p.second.Topic->GetInternalName());
Y_VERIFY(it != TopicCounters.end());
it->second.PartitionsInfly.Dec();
it->second.PartitionsReleased.Inc();
@@ -520,10 +520,10 @@ void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename THashMap<ui
bool res = ActualPartitionActors.erase(it->second.Actor);
Y_VERIFY(res);
- if (--NumPartitionsFromTopic[it->second.Partition.DiscoveryConverter->GetInternalName()] == 0) {
+ if (--NumPartitionsFromTopic[it->second.Topic->GetInternalName()] == 0) {
//ToDo[counters]
- bool res = TopicCounters.erase(it->second.Partition.DiscoveryConverter->GetInternalName());
- Y_VERIFY(res);
+ bool res_ = TopicCounters.erase(it->second.Topic->GetInternalName());
+ Y_VERIFY(res_);
}
if (SessionsActive) {
@@ -715,7 +715,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
auto topicIter = TopicsList.ClientTopics.find(topic.topic());
Y_VERIFY(!topicIter.IsEnd());
for (const auto& converter: topicIter->second) {
- const auto internalName = converter->GetInternalName();
+ const auto internalName = converter->GetOriginalPath();
for (i64 pg: topic.partition_group_ids()) {
if (pg < 0) {
CloseSession("partition group id must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST,
@@ -918,8 +918,21 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
Y_VERIFY(!BalancersInitStarted);
BalancersInitStarted = true;
- for (auto& t : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?)
- auto& topicHolder = Topics[t.TopicNameConverter->GetInternalName()];
+ for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?)
+ auto internalName = t.TopicNameConverter->GetInternalName();
+ auto topicGrIter = TopicGroups.find(name);
+ if (!topicGrIter.IsEnd()) {
+ auto value = std::move(topicGrIter->second);
+ TopicGroups.erase(topicGrIter);
+ TopicGroups.insert(std::make_pair(internalName, std::move(value)));
+ }
+ auto rtfsIter = ReadFromTimestamp.find(name);
+ if (!rtfsIter.IsEnd()) {
+ auto value = std::move(rtfsIter->second);
+ ReadFromTimestamp.erase(rtfsIter);
+ ReadFromTimestamp[internalName] = value;
+ }
+ auto& topicHolder = Topics[internalName];
topicHolder.TabletID = t.TabletID;
topicHolder.FullConverter = t.TopicNameConverter;
topicHolder.CloudId = t.CloudId;
@@ -948,7 +961,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
} else {
- for (auto& t : ev->Get()->TopicAndTablets) {
+ for (auto& [name, t] : ev->Get()->TopicAndTablets) {
if (Topics.find(t.TopicNameConverter->GetInternalName()) == Topics.end()) {
CloseSession(
TStringBuilder() << "list of topics changed - new topic '"
@@ -1688,7 +1701,16 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
Y_VERIFY(csize < Max<i32>());
auto jt = ReadFromTimestamp.find(it->second.Topic->GetInternalName());
- Y_VERIFY(jt != ReadFromTimestamp.end());
+ if (jt == ReadFromTimestamp.end()) {
+ LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Error searching for topic: " << it->second.Topic->GetInternalName()
+ << " (" << it->second.Topic->GetPrintableString() << ")");
+ for (const auto& [k, v] : ReadFromTimestamp) {
+ const auto& kk = k;
+ LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Have topic: " << kk);
+ }
+ CloseSession(TStringBuilder() << "Internal error", PersQueue::ErrorCode::ERROR, ctx);
+ return;
+ }
ui64 readTimestampMs = Max(ReadTimestampMs, jt->second);
TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(Reads.front()->Guid, ccount, csize, MaxTimeLagMs, readTimestampMs);
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 1c53e3c5f20..399d7c73fc6 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -302,6 +302,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
{
TString error;
+
auto status = FillProposeRequestImpl(name, GetProtoRequest()->settings(), modifyScheme, ctx, false, error,
workingDir, proposal.Record.GetDatabaseName(), LocalCluster);
if (!error.empty()) {
@@ -312,7 +313,6 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
const auto& pqDescr = modifyScheme.GetCreatePersQueueGroup();
const auto& config = pqDescr.GetPQTabletConfig();
-
if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC()
<< "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp
index edcba4537bf..badfbdcd3e7 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp
@@ -411,6 +411,11 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
Y_VERIFY(description.PartitionsSize() > 0);
Y_VERIFY(description.HasPQTabletConfig());
InitialPQTabletConfig = description.GetPQTabletConfig();
+ if (!DiscoveryConverter->IsValid()) {
+ errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", DiscoveryConverter->GetPrintableString().c_str());
+ CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx);
+ return;
+ }
FullConverter = DiscoveryConverter->UpgradeToFullConverter(InitialPQTabletConfig,
AppData(ctx)->PQConfig.GetTestDatabaseRoot());
InitAfterDiscovery(ctx);
@@ -1122,8 +1127,9 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) {
ctx, NKikimrServices::PQ_WRITE_PROXY,
"write session: cookie=" << Cookie << " sessionId=" << OwnerCookie << " userAgent=\"" << UserAgent
<< "\" ip=" << PeerName << " proto=v1 "
- << " topic=" << DiscoveryConverter->GetInternalName()
- << " durationSec=" << (ctx.Now() - StartTime).Seconds());
+ << " topic=" << InitRequest.topic()
+ << " durationSec=" << (ctx.Now() - StartTime).Seconds()
+ );
LogSessionDeadline = ctx.Now() + TDuration::Hours(1) + TDuration::Seconds(rand() % 60);
}