diff options
author | komels <komels@yandex-team.ru> | 2022-06-23 15:43:04 +0300 |
---|---|---|
committer | komels <komels@yandex-team.ru> | 2022-06-23 15:43:04 +0300 |
commit | 439ee34959ae6a27dc5ae8cc654804ef0d7eb280 (patch) | |
tree | 5c19190d0ee730644236788319ad6026ce7e09c4 | |
parent | 8a5992c0672644d86d714b38434563201eab91a9 (diff) | |
download | ydb-439ee34959ae6a27dc5ae8cc654804ef0d7eb280.tar.gz |
Refactor converters
Fix for LOGBROKER-7559
ref:0a5eeec92874536d09c3da5cd5c2f1ae8cc214ae
12 files changed, 108 insertions, 73 deletions
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp index e6c181c1d36..419383f3fd9 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.cpp +++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp @@ -191,19 +191,7 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr topicPath.SkipPrefix("/"); Database = databaseBuf; } -/* - if (!isRootDb) { - for (auto& rootDb: rootDatabases) { - TStringBuf rootBuf(rootDb); - if (!databaseBuf.empty() && IsPathPrefix(databaseBuf, rootBuf) || IsPathPrefix(topicPath, rootBuf)) { - isRootDb = true; - root = rootDb; - topicPath.SkipPrefix(rootBuf); - break; - } - } - } -*/ + OriginalTopic = topicPath; if (!isRootDb && Database.Defined()) { // Topic with valid non-root database. Parse as 'modern' name. Primary path is path in database. @@ -239,9 +227,8 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr TTopicConverterPtr TDiscoveryConverter::UpgradeToFullConverter( const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride ) { - + Y_VERIFY_S(Valid, Reason.c_str()); auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, ydbDatabaseRootOverride); - res->InternalName = InternalName; return TTopicConverterPtr(res); } @@ -510,6 +497,14 @@ TString TDiscoveryConverter::GetPrimaryPath() const { CHECK_VALID_AND_RETURN(PrimaryPath); } +TString TDiscoveryConverter::GetOriginalPath() const { + if (!OriginalPath.empty()) { + return OriginalPath; + } else { + return GetPrimaryPath(); + } +} + const TMaybe<TString>& TDiscoveryConverter::GetSecondaryPath(const TString& database) { if (!database.empty()) { SetDatabase(database); @@ -538,13 +533,6 @@ void TDiscoveryConverter::SetDatabase(const TString& database) { PendingDatabase = false; } -TString TDiscoveryConverter::GetInternalName() const { - if (InternalName.empty()) { - return GetPrimaryPath(); - } - CHECK_VALID_AND_RETURN(InternalName); -} - const TString& TDiscoveryConverter::GetOriginalTopic() const { return OriginalTopic; } @@ -655,6 +643,7 @@ TTopicConverterPtr TTopicNameConverter::ForFederation( Y_VERIFY(!res->Dc.empty()); Y_VERIFY(!res->FullLegacyName.empty()); res->Account = *res->Account_; + res->InternalName = res->FullLegacyName; } return res; } @@ -702,6 +691,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi ClientsideName = path; ShortClientsideName = path; FullModernName = path; + InternalName = PrimaryPath; } else { SetDatabase(*Database); Y_VERIFY(!FullLegacyName.empty()); @@ -716,6 +706,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi LegacyProducer = Account; } Y_VERIFY(!FullModernName.empty()); + InternalName = FullLegacyName; } } @@ -730,15 +721,13 @@ const TString& TTopicNameConverter::GetModernName() const { TString TTopicNameConverter::GetShortLegacyName() const { CHECK_VALID_AND_RETURN(ShortLegacyName); } -// -//TString TTopicNameConverter::GetFullLegacyName() const { -// if (FstClass) { -// return TString(""); -// } -// CHECK_VALID_AND_RETURN(FullLegacyName); -//} + +TString TTopicNameConverter::GetInternalName() const { + CHECK_VALID_AND_RETURN(InternalName); +} const TString& TTopicNameConverter::GetClientsideName() const { + Y_VERIFY_S(Valid, Reason.c_str()); Y_VERIFY(!ClientsideName.empty()); return ClientsideName; } @@ -789,6 +778,7 @@ TString TTopicNameConverter::GetTopicForSrcIdHash() const { } TString TTopicNameConverter::GetSecondaryPath() const { + Y_VERIFY_S(Valid, Reason.c_str()); if (!FstClass) { Y_VERIFY(SecondaryPath.Defined()); return *SecondaryPath; @@ -829,7 +819,7 @@ TTopicsToConverter TTopicsListController::GetReadTopicsList( << "': " << converter->GetReason(); return; } - result.Topics[converter->GetInternalName()] = converter; + result.Topics[converter->GetOriginalPath()] = converter; result.ClientTopics[topic].push_back(converter); }; diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 14881cfbd32..c7e5b16f9fb 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -35,7 +35,7 @@ TString MakeConsumerPath(const TString& consumer); if (!Valid) { \ return TString(); \ } else { \ - Y_VERIFY(!result.empty()); \ + Y_VERIFY_S(!result.empty(), OriginalTopic.c_str()); \ return result; \ } @@ -96,6 +96,8 @@ public: */ TString GetPrimaryPath() const; + TString GetOriginalPath() const; + /** Second path if applicable: * Nothing for first class * Nothing for topic with explicitly specified non-root Database @@ -107,22 +109,11 @@ public: */ const TMaybe<TString>& GetSecondaryPath(const TString& databasePath); - /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings. - * Single topic in any representation is supposed to have same internal name - * DO NOT use for business logic, such as sensors, SourceIds, etc - * - * /user-database/dir/my-topic - * OR - * /user-database/dir/.my-topic/mirrored-from-dc - * - * */ - TString GetInternalName() const; - /** Only for special cases. */ void SetPrimaryPath(const TString& path) { - InternalName = PrimaryPath; + OriginalPath = PrimaryPath; PrimaryPath = path; } @@ -148,8 +139,8 @@ protected: TString PQPrefix; //TVector<TString> RootDatabases; + TString OriginalPath; TString PrimaryPath; - TString InternalName; bool PendingDatabase = false; TMaybe<TString> SecondaryPath; TMaybe<TString> Account_; @@ -236,6 +227,18 @@ public: * */ const TString& GetModernName() const; + /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings. + * Single topic in any representation is supposed to have same internal name + * DO NOT use for business logic, such as sensors, SourceIds, etc + * + * /user-database/dir/my-topic + * OR + * /user-database/dir/.my-topic/mirrored-from-dc + * + * */ + TString GetInternalName() const; + + TString GetTopicForSrcId() const; TString GetTopicForSrcIdHash() const; @@ -251,6 +254,7 @@ private: TString ClientsideName; TString ShortClientsideName; TString Account; + TString InternalName; }; using TDiscoveryConverterPtr = std::shared_ptr<TDiscoveryConverter>; diff --git a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp index b426096c93d..368949b8c23 100644 --- a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp +++ b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp @@ -25,7 +25,7 @@ public: ); UNIT_ASSERT(!DiscoveryConverter->FullLegacyName.empty()); UNIT_ASSERT(!DiscoveryConverter->ShortLegacyName.empty()); - UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); + //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); } void BasicFirstClassChecks() { @@ -35,7 +35,7 @@ public: DiscoveryConverter->FullModernPath ); - UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); + //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); } void SetDatabase(const TString& database) { @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) { Y_UNIT_TEST(FullLegacyPath) { TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1")); wrapper.SetConverter("/Root/PQ/rt3.dc1--account--topic", "", "/Root"); - UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic"); + //UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic"); } Y_UNIT_TEST(MinimalName) { @@ -171,12 +171,12 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) { UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetCluster(), "dc1"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcId(), "rt3.dc1--account--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "account--topic"); - UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/topic"); + UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account--topic"); } Y_UNIT_TEST(Paths) { @@ -198,6 +198,8 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) { UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account@path--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic"); + UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account@path--topic"); + } { NKikimrPQ::TPQTabletConfig pqConfig; @@ -214,6 +216,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) { UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/.topic/mirrored-from-dc2"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc2--account@path--topic"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic"); + UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc2--account@path--topic"); } } @@ -232,6 +235,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) { UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "lb/database/my-stream"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "my-stream"); UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "my-stream"); + UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "lb/database/my-stream"); } } @@ -244,6 +248,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) { UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account"); UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic"); UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account--topic"); + UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account--topic"); converter = TTopicNameConverter::ForFederation("/Root/PQ", "", "rt3.sas--account@dir--topic", "/Root/PQ", "/Root", false); UNIT_ASSERT_C(converter->IsValid(), converter->GetReason()); @@ -252,6 +257,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) { UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account@dir"); UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic"); UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account@dir--topic"); + UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account@dir--topic"); } Y_UNIT_TEST(BadLegacyTopics) { diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index 4708cf842ea..e13e5ff9d82 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -35,6 +35,6 @@ namespace NKikimr::NGRpcProxy { TString FolderId; }; - using TTopicTabletsPairs = TVector<TTopicInitInfo>; + using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>; } // namespace NKikimr::NGRpcProxy diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 8b7a1dbd0e3..ee8ddb7872a 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -71,11 +71,11 @@ struct TEvPQProxy { struct TEvAuthResultOk : public NActors::TEventLocal<TEvAuthResultOk, EvAuthResultOk> { - TEvAuthResultOk(const TTopicTabletsPairs&& topicAndTablets) + TEvAuthResultOk(const TTopicInitInfoMap&& topicAndTablets) : TopicAndTablets(std::move(topicAndTablets)) { } - TTopicTabletsPairs TopicAndTablets; + TTopicInitInfoMap TopicAndTablets; }; struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> { diff --git a/ydb/services/persqueue_v1/actors/partition_id.h b/ydb/services/persqueue_v1/actors/partition_id.h index 2f5af8673c3..64bd77be4a2 100644 --- a/ydb/services/persqueue_v1/actors/partition_id.h +++ b/ydb/services/persqueue_v1/actors/partition_id.h @@ -16,8 +16,8 @@ struct TPartitionId { ui64 AssignId; bool operator < (const TPartitionId& rhs) const { - return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetInternalName()) < - std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetInternalName()); + return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetOriginalPath()) < + std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetOriginalPath()); } }; diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.cpp b/ydb/services/persqueue_v1/actors/read_info_actor.cpp index 109debd32ba..12c63873f0a 100644 --- a/ydb/services/persqueue_v1/actors/read_info_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_info_actor.cpp @@ -98,7 +98,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC NKikimrClient::TPersQueueRequest proto; proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->SetClientId(ClientId); - for (auto& t : TopicAndTablets) { + for (auto& [_, t] : TopicAndTablets) { proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->AddTopic(t.TopicNameConverter->GetClientsideName()); } diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.h b/ydb/services/persqueue_v1/actors/read_info_actor.h index 307153a9ed7..3ea8360d483 100644 --- a/ydb/services/persqueue_v1/actors/read_info_actor.h +++ b/ydb/services/persqueue_v1/actors/read_info_actor.h @@ -60,7 +60,7 @@ private: TActorId AuthInitActor; - TTopicTabletsPairs TopicAndTablets; + TTopicInitInfoMap TopicAndTablets; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index d5f0dc3da10..ba707b095d0 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -93,9 +93,16 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId(); topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId(); topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId(); + if (!topicsIter->second.DiscoveryConverter->IsValid()) { + TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", + topicsIter->second.DiscoveryConverter->GetPrintableString().c_str()); + CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); + return false; + } topicsIter->second.FullConverter = topicsIter->second.DiscoveryConverter->UpgradeToFullConverter( pqDescr.GetPQTabletConfig(), AppData(ctx)->PQConfig.GetTestDatabaseRoot() ); + Y_VERIFY(topicsIter->second.FullConverter->IsValid()); return CheckTopicACL(entry, topicsIter->first, ctx); } @@ -107,7 +114,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon auto i = 0u; auto& topicsRequested = ev->Get()->TopicsRequested; for (const auto& entry : ev->Get()->Result->ResultSet) { - const auto& path = topicsRequested[i++]->GetInternalName(); + const auto& path = topicsRequested[i++]->GetOriginalPath(); auto it = Topics.find(path); Y_VERIFY(it != Topics.end()); @@ -116,8 +123,8 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon const auto& topic = entry.ListNodeEntry->Children.at(0); it->second.DiscoveryConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name))); - Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second; - Topics.erase(it); +// Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second; +// Topics.erase(it); reDescribe = true; continue; @@ -241,9 +248,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate( void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { - TTopicTabletsPairs res; - for (auto& [_, holder] : Topics) { - res.emplace_back(decltype(res)::value_type({ + TTopicInitInfoMap res; + for (auto& [name, holder] : Topics) { + res.insert(std::make_pair(name, TTopicInitInfo{ holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId })); } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index a329268ae85..88e2c99c78b 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -367,7 +367,7 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { if (!p.second.Released) { // ToDo[counters] - auto it = TopicCounters.find(p.second.Partition.DiscoveryConverter->GetInternalName()); + auto it = TopicCounters.find(p.second.Topic->GetInternalName()); Y_VERIFY(it != TopicCounters.end()); it->second.PartitionsInfly.Dec(); it->second.PartitionsReleased.Inc(); @@ -520,10 +520,10 @@ void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename THashMap<ui bool res = ActualPartitionActors.erase(it->second.Actor); Y_VERIFY(res); - if (--NumPartitionsFromTopic[it->second.Partition.DiscoveryConverter->GetInternalName()] == 0) { + if (--NumPartitionsFromTopic[it->second.Topic->GetInternalName()] == 0) { //ToDo[counters] - bool res = TopicCounters.erase(it->second.Partition.DiscoveryConverter->GetInternalName()); - Y_VERIFY(res); + bool res_ = TopicCounters.erase(it->second.Topic->GetInternalName()); + Y_VERIFY(res_); } if (SessionsActive) { @@ -715,7 +715,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& auto topicIter = TopicsList.ClientTopics.find(topic.topic()); Y_VERIFY(!topicIter.IsEnd()); for (const auto& converter: topicIter->second) { - const auto internalName = converter->GetInternalName(); + const auto internalName = converter->GetOriginalPath(); for (i64 pg: topic.partition_group_ids()) { if (pg < 0) { CloseSession("partition group id must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, @@ -918,8 +918,21 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk Y_VERIFY(!BalancersInitStarted); BalancersInitStarted = true; - for (auto& t : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) - auto& topicHolder = Topics[t.TopicNameConverter->GetInternalName()]; + for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) + auto internalName = t.TopicNameConverter->GetInternalName(); + auto topicGrIter = TopicGroups.find(name); + if (!topicGrIter.IsEnd()) { + auto value = std::move(topicGrIter->second); + TopicGroups.erase(topicGrIter); + TopicGroups.insert(std::make_pair(internalName, std::move(value))); + } + auto rtfsIter = ReadFromTimestamp.find(name); + if (!rtfsIter.IsEnd()) { + auto value = std::move(rtfsIter->second); + ReadFromTimestamp.erase(rtfsIter); + ReadFromTimestamp[internalName] = value; + } + auto& topicHolder = Topics[internalName]; topicHolder.TabletID = t.TabletID; topicHolder.FullConverter = t.TopicNameConverter; topicHolder.CloudId = t.CloudId; @@ -948,7 +961,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); } else { - for (auto& t : ev->Get()->TopicAndTablets) { + for (auto& [name, t] : ev->Get()->TopicAndTablets) { if (Topics.find(t.TopicNameConverter->GetInternalName()) == Topics.end()) { CloseSession( TStringBuilder() << "list of topics changed - new topic '" @@ -1688,7 +1701,16 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& Y_VERIFY(csize < Max<i32>()); auto jt = ReadFromTimestamp.find(it->second.Topic->GetInternalName()); - Y_VERIFY(jt != ReadFromTimestamp.end()); + if (jt == ReadFromTimestamp.end()) { + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Error searching for topic: " << it->second.Topic->GetInternalName() + << " (" << it->second.Topic->GetPrintableString() << ")"); + for (const auto& [k, v] : ReadFromTimestamp) { + const auto& kk = k; + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Have topic: " << kk); + } + CloseSession(TStringBuilder() << "Internal error", PersQueue::ErrorCode::ERROR, ctx); + return; + } ui64 readTimestampMs = Max(ReadTimestampMs, jt->second); TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(Reads.front()->Guid, ccount, csize, MaxTimeLagMs, readTimestampMs); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 1c53e3c5f20..399d7c73fc6 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -302,6 +302,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction { TString error; + auto status = FillProposeRequestImpl(name, GetProtoRequest()->settings(), modifyScheme, ctx, false, error, workingDir, proposal.Record.GetDatabaseName(), LocalCluster); if (!error.empty()) { @@ -312,7 +313,6 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction const auto& pqDescr = modifyScheme.GetCreatePersQueueGroup(); const auto& config = pqDescr.GetPQTabletConfig(); - if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) { Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC() << "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index edcba4537bf..badfbdcd3e7 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -411,6 +411,11 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo Y_VERIFY(description.PartitionsSize() > 0); Y_VERIFY(description.HasPQTabletConfig()); InitialPQTabletConfig = description.GetPQTabletConfig(); + if (!DiscoveryConverter->IsValid()) { + errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", DiscoveryConverter->GetPrintableString().c_str()); + CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); + return; + } FullConverter = DiscoveryConverter->UpgradeToFullConverter(InitialPQTabletConfig, AppData(ctx)->PQConfig.GetTestDatabaseRoot()); InitAfterDiscovery(ctx); @@ -1122,8 +1127,9 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) { ctx, NKikimrServices::PQ_WRITE_PROXY, "write session: cookie=" << Cookie << " sessionId=" << OwnerCookie << " userAgent=\"" << UserAgent << "\" ip=" << PeerName << " proto=v1 " - << " topic=" << DiscoveryConverter->GetInternalName() - << " durationSec=" << (ctx.Now() - StartTime).Seconds()); + << " topic=" << InitRequest.topic() + << " durationSec=" << (ctx.Now() - StartTime).Seconds() + ); LogSessionDeadline = ctx.Now() + TDuration::Hours(1) + TDuration::Seconds(rand() % 60); } |