diff options
| author | komels <[email protected]> | 2022-06-23 15:43:04 +0300 | 
|---|---|---|
| committer | komels <[email protected]> | 2022-06-23 15:43:04 +0300 | 
| commit | 439ee34959ae6a27dc5ae8cc654804ef0d7eb280 (patch) | |
| tree | 5c19190d0ee730644236788319ad6026ce7e09c4 | |
| parent | 8a5992c0672644d86d714b38434563201eab91a9 (diff) | |
Refactor converters
Fix for LOGBROKER-7559
ref:0a5eeec92874536d09c3da5cd5c2f1ae8cc214ae
12 files changed, 108 insertions, 73 deletions
| diff --git a/ydb/library/persqueue/topic_parser/topic_parser.cpp b/ydb/library/persqueue/topic_parser/topic_parser.cpp index e6c181c1d36..419383f3fd9 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.cpp +++ b/ydb/library/persqueue/topic_parser/topic_parser.cpp @@ -191,19 +191,7 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr          topicPath.SkipPrefix("/");          Database = databaseBuf;      } -/* -   if (!isRootDb) { -        for (auto& rootDb: rootDatabases) { -            TStringBuf rootBuf(rootDb); -            if (!databaseBuf.empty() && IsPathPrefix(databaseBuf, rootBuf) || IsPathPrefix(topicPath, rootBuf)) { -                isRootDb = true; -                root = rootDb; -                topicPath.SkipPrefix(rootBuf); -                break; -            } -        } -    } -*/ +      OriginalTopic = topicPath;      if (!isRootDb && Database.Defined()) {          // Topic with valid non-root database. Parse as 'modern' name. Primary path is path in database. @@ -239,9 +227,8 @@ void TDiscoveryConverter::BuildForFederation(const TStringBuf& databaseBuf, TStr  TTopicConverterPtr TDiscoveryConverter::UpgradeToFullConverter(          const NKikimrPQ::TPQTabletConfig& pqTabletConfig, const TString& ydbDatabaseRootOverride  ) { - +    Y_VERIFY_S(Valid, Reason.c_str());      auto* res = new TTopicNameConverter(FstClass, PQPrefix, pqTabletConfig, ydbDatabaseRootOverride); -    res->InternalName = InternalName;      return TTopicConverterPtr(res);  } @@ -510,6 +497,14 @@ TString TDiscoveryConverter::GetPrimaryPath() const {      CHECK_VALID_AND_RETURN(PrimaryPath);  } +TString TDiscoveryConverter::GetOriginalPath() const { +    if (!OriginalPath.empty()) { +        return OriginalPath; +    } else { +        return GetPrimaryPath(); +    } +} +  const TMaybe<TString>& TDiscoveryConverter::GetSecondaryPath(const TString& database) {      if (!database.empty()) {          SetDatabase(database); @@ -538,13 +533,6 @@ void TDiscoveryConverter::SetDatabase(const TString& database) {      PendingDatabase = false;  } -TString TDiscoveryConverter::GetInternalName() const { -    if (InternalName.empty()) { -        return GetPrimaryPath(); -    } -    CHECK_VALID_AND_RETURN(InternalName); -} -  const TString& TDiscoveryConverter::GetOriginalTopic() const {      return OriginalTopic;  } @@ -655,6 +643,7 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(          Y_VERIFY(!res->Dc.empty());          Y_VERIFY(!res->FullLegacyName.empty());          res->Account = *res->Account_; +        res->InternalName = res->FullLegacyName;      }      return res;  } @@ -702,6 +691,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi          ClientsideName = path;          ShortClientsideName = path;          FullModernName = path; +        InternalName = PrimaryPath;      } else {          SetDatabase(*Database);          Y_VERIFY(!FullLegacyName.empty()); @@ -716,6 +706,7 @@ void TTopicNameConverter::BuildInternals(const NKikimrPQ::TPQTabletConfig& confi              LegacyProducer = Account;          }          Y_VERIFY(!FullModernName.empty()); +        InternalName = FullLegacyName;      }  } @@ -730,15 +721,13 @@ const TString& TTopicNameConverter::GetModernName() const {  TString TTopicNameConverter::GetShortLegacyName() const {      CHECK_VALID_AND_RETURN(ShortLegacyName);  } -// -//TString TTopicNameConverter::GetFullLegacyName() const { -//    if (FstClass) { -//        return TString(""); -//    } -//    CHECK_VALID_AND_RETURN(FullLegacyName); -//} + +TString TTopicNameConverter::GetInternalName() const { +    CHECK_VALID_AND_RETURN(InternalName); +}  const TString& TTopicNameConverter::GetClientsideName() const { +    Y_VERIFY_S(Valid, Reason.c_str());      Y_VERIFY(!ClientsideName.empty());      return ClientsideName;  } @@ -789,6 +778,7 @@ TString TTopicNameConverter::GetTopicForSrcIdHash() const {  }  TString TTopicNameConverter::GetSecondaryPath() const { +    Y_VERIFY_S(Valid, Reason.c_str());      if (!FstClass) {          Y_VERIFY(SecondaryPath.Defined());          return *SecondaryPath; @@ -829,7 +819,7 @@ TTopicsToConverter TTopicsListController::GetReadTopicsList(                                               << "': " << converter->GetReason();              return;          } -        result.Topics[converter->GetInternalName()] = converter; +        result.Topics[converter->GetOriginalPath()] = converter;          result.ClientTopics[topic].push_back(converter);      }; diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 14881cfbd32..c7e5b16f9fb 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -35,7 +35,7 @@ TString MakeConsumerPath(const TString& consumer);      if (!Valid) {                         \          return TString();                 \      } else {                              \ -        Y_VERIFY(!result.empty());        \ +        Y_VERIFY_S(!result.empty(), OriginalTopic.c_str());        \          return result;                    \      } @@ -96,6 +96,8 @@ public:       */      TString GetPrimaryPath() const; +    TString GetOriginalPath() const; +      /** Second path if applicable:       * Nothing for first class       * Nothing for topic with explicitly specified non-root Database @@ -107,22 +109,11 @@ public:       */      const TMaybe<TString>& GetSecondaryPath(const TString& databasePath); -    /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings. -     * Single topic in any representation is supposed to have same internal name -     * DO NOT use for business logic, such as sensors, SourceIds, etc -     * -     * /user-database/dir/my-topic -     * OR -     * /user-database/dir/.my-topic/mirrored-from-dc -     * -     * */ -    TString GetInternalName() const; -      /**      Only for special cases.      */      void SetPrimaryPath(const TString& path) { -        InternalName = PrimaryPath; +        OriginalPath = PrimaryPath;          PrimaryPath = path;      } @@ -148,8 +139,8 @@ protected:      TString PQPrefix;      //TVector<TString> RootDatabases; +    TString OriginalPath;      TString PrimaryPath; -    TString InternalName;      bool PendingDatabase = false;      TMaybe<TString> SecondaryPath;      TMaybe<TString> Account_; @@ -236,6 +227,18 @@ public:       * */      const TString& GetModernName() const; +    /** Unique convertable name for internal purposes. Maybe uses hashing/for mappings. +     * Single topic in any representation is supposed to have same internal name +     * DO NOT use for business logic, such as sensors, SourceIds, etc +     * +     * /user-database/dir/my-topic +     * OR +     * /user-database/dir/.my-topic/mirrored-from-dc +     * +     * */ +    TString GetInternalName() const; + +      TString GetTopicForSrcId() const;      TString GetTopicForSrcIdHash() const; @@ -251,6 +254,7 @@ private:      TString ClientsideName;      TString ShortClientsideName;      TString Account; +    TString InternalName;  };  using TDiscoveryConverterPtr = std::shared_ptr<TDiscoveryConverter>; diff --git a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp index b426096c93d..368949b8c23 100644 --- a/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp +++ b/ydb/library/persqueue/topic_parser/ut/topic_names_converter_ut.cpp @@ -25,7 +25,7 @@ public:          );          UNIT_ASSERT(!DiscoveryConverter->FullLegacyName.empty());          UNIT_ASSERT(!DiscoveryConverter->ShortLegacyName.empty()); -        UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); +        //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());      }      void BasicFirstClassChecks() { @@ -35,7 +35,7 @@ public:                  DiscoveryConverter->FullModernPath          ); -        UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath()); +        //UNIT_ASSERT_VALUES_EQUAL(DiscoveryConverter->GetInternalName(), DiscoveryConverter->GetPrimaryPath());      }      void SetDatabase(const TString& database) { @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {      Y_UNIT_TEST(FullLegacyPath) {          TConverterTestWrapper wrapper(false, "/Root/PQ", TString("dc1"));          wrapper.SetConverter("/Root/PQ/rt3.dc1--account--topic", "", "/Root"); -        UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic"); +        //UNIT_ASSERT_VALUES_EQUAL(wrapper.DiscoveryConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");      }      Y_UNIT_TEST(MinimalName) { @@ -171,12 +171,12 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetCluster(), "dc1");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcId(), "rt3.dc1--account--topic");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "account--topic"); -        UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "Root/PQ/rt3.dc1--account--topic");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account--topic");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "topic");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetPrimaryPath(), "Root/PQ/rt3.dc1--account--topic");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/topic"); +        UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account--topic");      }      Y_UNIT_TEST(Paths) { @@ -198,6 +198,8 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/topic");              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc1--account@path--topic");              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic"); +            UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc1--account@path--topic"); +          }          {              NKikimrPQ::TPQTabletConfig pqConfig; @@ -214,6 +216,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "path/.topic/mirrored-from-dc2");              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetClientsideName(), "rt3.dc2--account@path--topic");              UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "account/path/topic"); +            UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "rt3.dc2--account@path--topic");          }      } @@ -232,6 +235,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterTest) {          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetTopicForSrcIdHash(), "lb/database/my-stream");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetModernName(), "my-stream");          UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetFederationPath(), "my-stream"); +        UNIT_ASSERT_VALUES_EQUAL(wrapper.TopicConverter->GetInternalName(), "lb/database/my-stream");      }  } @@ -244,6 +248,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {          UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account");          UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");          UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account--topic"); +        UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account--topic");          converter = TTopicNameConverter::ForFederation("/Root/PQ", "", "rt3.sas--account@dir--topic", "/Root/PQ", "/Root", false);          UNIT_ASSERT_C(converter->IsValid(), converter->GetReason()); @@ -252,6 +257,7 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {          UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyProducer(), "account@dir");          UNIT_ASSERT_VALUES_EQUAL(converter->GetLegacyLogtype(), "topic");          UNIT_ASSERT_VALUES_EQUAL(converter->GetPrimaryPath(), "Root/PQ/rt3.sas--account@dir--topic"); +        UNIT_ASSERT_VALUES_EQUAL(converter->GetInternalName(), "rt3.sas--account@dir--topic");      }      Y_UNIT_TEST(BadLegacyTopics) { diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index 4708cf842ea..e13e5ff9d82 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -35,6 +35,6 @@ namespace NKikimr::NGRpcProxy {          TString FolderId;      }; -    using TTopicTabletsPairs = TVector<TTopicInitInfo>; +    using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;  } //    namespace NKikimr::NGRpcProxy diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 8b7a1dbd0e3..ee8ddb7872a 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -71,11 +71,11 @@ struct TEvPQProxy {      struct TEvAuthResultOk : public NActors::TEventLocal<TEvAuthResultOk, EvAuthResultOk> { -        TEvAuthResultOk(const TTopicTabletsPairs&& topicAndTablets) +        TEvAuthResultOk(const TTopicInitInfoMap&& topicAndTablets)              : TopicAndTablets(std::move(topicAndTablets))          { } -        TTopicTabletsPairs TopicAndTablets; +        TTopicInitInfoMap TopicAndTablets;      };      struct TEvSessionSetPreferredCluster : public NActors::TEventLocal<TEvSessionSetPreferredCluster, EvSessionSetPreferredCluster> { diff --git a/ydb/services/persqueue_v1/actors/partition_id.h b/ydb/services/persqueue_v1/actors/partition_id.h index 2f5af8673c3..64bd77be4a2 100644 --- a/ydb/services/persqueue_v1/actors/partition_id.h +++ b/ydb/services/persqueue_v1/actors/partition_id.h @@ -16,8 +16,8 @@ struct TPartitionId {      ui64 AssignId;      bool operator < (const TPartitionId& rhs) const { -        return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetInternalName()) < -               std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetInternalName()); +        return std::make_tuple(AssignId, Partition, DiscoveryConverter->GetOriginalPath()) < +               std::make_tuple(rhs.AssignId, rhs.Partition, rhs.DiscoveryConverter->GetOriginalPath());      }  }; diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.cpp b/ydb/services/persqueue_v1/actors/read_info_actor.cpp index 109debd32ba..12c63873f0a 100644 --- a/ydb/services/persqueue_v1/actors/read_info_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_info_actor.cpp @@ -98,7 +98,7 @@ void TReadInfoActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorC      NKikimrClient::TPersQueueRequest proto;      proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->SetClientId(ClientId); -    for (auto& t : TopicAndTablets) { +    for (auto& [_, t] : TopicAndTablets) {          proto.MutableMetaRequest()->MutableCmdGetReadSessionsInfo()->AddTopic(t.TopicNameConverter->GetClientsideName());      } diff --git a/ydb/services/persqueue_v1/actors/read_info_actor.h b/ydb/services/persqueue_v1/actors/read_info_actor.h index 307153a9ed7..3ea8360d483 100644 --- a/ydb/services/persqueue_v1/actors/read_info_actor.h +++ b/ydb/services/persqueue_v1/actors/read_info_actor.h @@ -60,7 +60,7 @@ private:      TActorId AuthInitActor; -    TTopicTabletsPairs TopicAndTablets; +    TTopicInitInfoMap TopicAndTablets;      TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index d5f0dc3da10..ba707b095d0 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -93,9 +93,16 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(      topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId();      topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId();      topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId(); +    if (!topicsIter->second.DiscoveryConverter->IsValid()) { +        TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", +                                      topicsIter->second.DiscoveryConverter->GetPrintableString().c_str()); +        CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); +        return false; +    }      topicsIter->second.FullConverter = topicsIter->second.DiscoveryConverter->UpgradeToFullConverter(              pqDescr.GetPQTabletConfig(), AppData(ctx)->PQConfig.GetTestDatabaseRoot()      ); +    Y_VERIFY(topicsIter->second.FullConverter->IsValid());      return CheckTopicACL(entry, topicsIter->first, ctx);  } @@ -107,7 +114,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon      auto i = 0u;      auto& topicsRequested = ev->Get()->TopicsRequested;      for (const auto& entry : ev->Get()->Result->ResultSet) { -        const auto& path = topicsRequested[i++]->GetInternalName(); +        const auto& path = topicsRequested[i++]->GetOriginalPath();          auto it = Topics.find(path);          Y_VERIFY(it != Topics.end()); @@ -116,8 +123,8 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon              const auto& topic = entry.ListNodeEntry->Children.at(0);              it->second.DiscoveryConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name))); -            Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second; -            Topics.erase(it); +//            Topics[it->second.DiscoveryConverter->GetInternalName()] = it->second; +//            Topics.erase(it);              reDescribe = true;              continue; @@ -241,9 +248,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate(  void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { -    TTopicTabletsPairs res; -    for (auto& [_, holder] : Topics) { -        res.emplace_back(decltype(res)::value_type({ +    TTopicInitInfoMap res; +    for (auto& [name, holder] : Topics) { +        res.insert(std::make_pair(name, TTopicInitInfo{              holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId          }));      } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index a329268ae85..88e2c99c78b 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -367,7 +367,7 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {          if (!p.second.Released) {              // ToDo[counters] -            auto it = TopicCounters.find(p.second.Partition.DiscoveryConverter->GetInternalName()); +            auto it = TopicCounters.find(p.second.Topic->GetInternalName());              Y_VERIFY(it != TopicCounters.end());              it->second.PartitionsInfly.Dec();              it->second.PartitionsReleased.Inc(); @@ -520,10 +520,10 @@ void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename THashMap<ui      bool res = ActualPartitionActors.erase(it->second.Actor);      Y_VERIFY(res); -    if (--NumPartitionsFromTopic[it->second.Partition.DiscoveryConverter->GetInternalName()] == 0) { +    if (--NumPartitionsFromTopic[it->second.Topic->GetInternalName()] == 0) {          //ToDo[counters] -        bool res = TopicCounters.erase(it->second.Partition.DiscoveryConverter->GetInternalName()); -        Y_VERIFY(res); +        bool res_ = TopicCounters.erase(it->second.Topic->GetInternalName()); +        Y_VERIFY(res_);      }      if (SessionsActive) { @@ -715,7 +715,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&          auto topicIter = TopicsList.ClientTopics.find(topic.topic());          Y_VERIFY(!topicIter.IsEnd());          for (const auto& converter: topicIter->second) { -            const auto internalName = converter->GetInternalName(); +            const auto internalName = converter->GetOriginalPath();              for (i64 pg: topic.partition_group_ids()) {                  if (pg < 0) {                      CloseSession("partition group id must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, @@ -918,8 +918,21 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk          Y_VERIFY(!BalancersInitStarted);          BalancersInitStarted = true; -        for (auto& t : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) -            auto& topicHolder = Topics[t.TopicNameConverter->GetInternalName()]; +        for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) +            auto internalName = t.TopicNameConverter->GetInternalName(); +            auto topicGrIter = TopicGroups.find(name); +            if (!topicGrIter.IsEnd()) { +                auto value = std::move(topicGrIter->second); +                TopicGroups.erase(topicGrIter); +                TopicGroups.insert(std::make_pair(internalName, std::move(value))); +            } +            auto rtfsIter = ReadFromTimestamp.find(name); +            if (!rtfsIter.IsEnd()) { +                auto value = std::move(rtfsIter->second); +                ReadFromTimestamp.erase(rtfsIter); +                ReadFromTimestamp[internalName] = value; +            } +            auto& topicHolder = Topics[internalName];              topicHolder.TabletID = t.TabletID;              topicHolder.FullConverter = t.TopicNameConverter;              topicHolder.CloudId = t.CloudId; @@ -948,7 +961,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk          ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());      } else { -        for (auto& t : ev->Get()->TopicAndTablets) { +        for (auto& [name, t] : ev->Get()->TopicAndTablets) {              if (Topics.find(t.TopicNameConverter->GetInternalName()) == Topics.end()) {                  CloseSession(                          TStringBuilder() << "list of topics changed - new topic '" @@ -1688,7 +1701,16 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&              Y_VERIFY(csize < Max<i32>());              auto jt = ReadFromTimestamp.find(it->second.Topic->GetInternalName()); -            Y_VERIFY(jt != ReadFromTimestamp.end()); +            if (jt == ReadFromTimestamp.end()) { +                LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Error searching for topic: " <<  it->second.Topic->GetInternalName() +                            << " (" << it->second.Topic->GetPrintableString() << ")"); +                for (const auto& [k, v] : ReadFromTimestamp) { +                    const auto& kk = k; +                    LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Have topic: " << kk); +                } +                CloseSession(TStringBuilder() << "Internal error", PersQueue::ErrorCode::ERROR, ctx); +                return; +            }              ui64 readTimestampMs = Max(ReadTimestampMs, jt->second);              TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(Reads.front()->Guid, ccount, csize, MaxTimeLagMs, readTimestampMs); diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 1c53e3c5f20..399d7c73fc6 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -302,6 +302,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction      {          TString error; +          auto status = FillProposeRequestImpl(name, GetProtoRequest()->settings(), modifyScheme, ctx, false, error,                                               workingDir, proposal.Record.GetDatabaseName(), LocalCluster);          if (!error.empty()) { @@ -312,7 +313,6 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction      const auto& pqDescr = modifyScheme.GetCreatePersQueueGroup();      const auto& config = pqDescr.GetPQTabletConfig(); -      if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) {          Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC()                                      << "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index edcba4537bf..badfbdcd3e7 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -411,6 +411,11 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo      Y_VERIFY(description.PartitionsSize() > 0);      Y_VERIFY(description.HasPQTabletConfig());      InitialPQTabletConfig = description.GetPQTabletConfig(); +    if (!DiscoveryConverter->IsValid()) { +        errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", DiscoveryConverter->GetPrintableString().c_str()); +        CloseSession(errorReason, PersQueue::ErrorCode::ERROR, ctx); +        return; +    }      FullConverter = DiscoveryConverter->UpgradeToFullConverter(InitialPQTabletConfig,                                                                 AppData(ctx)->PQConfig.GetTestDatabaseRoot());      InitAfterDiscovery(ctx); @@ -1122,8 +1127,9 @@ void TWriteSessionActor::LogSession(const TActorContext& ctx) {              ctx, NKikimrServices::PQ_WRITE_PROXY,              "write session:  cookie=" << Cookie << " sessionId=" << OwnerCookie << " userAgent=\"" << UserAgent                                        << "\" ip=" << PeerName << " proto=v1 " -                                      << " topic=" << DiscoveryConverter->GetInternalName() -                                      << " durationSec=" << (ctx.Now() - StartTime).Seconds()); +                                      << " topic=" << InitRequest.topic() +                                      << " durationSec=" << (ctx.Now() - StartTime).Seconds() +    );      LogSessionDeadline = ctx.Now() + TDuration::Hours(1) + TDuration::Seconds(rand() % 60);  } | 
