diff options
author | komels <komels@ydb.tech> | 2022-07-14 20:41:38 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2022-07-14 20:41:38 +0300 |
commit | 21ae714cb6579660d3a558b277ac920d00bbb90b (patch) | |
tree | 28c784064b596d4d88dafd9bc597303ac978d105 | |
parent | 1ea164fd21212c985cc1228349a2bec63af41fb1 (diff) | |
download | ydb-21ae714cb6579660d3a558b277ac920d00bbb90b.tar.gz |
Set topic path always in metacache
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 21 |
1 files changed, 18 insertions, 3 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index d0194f184f9..85d7650857f 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -31,6 +31,19 @@ IActor* CreateSchemeCache(const TActorContext& ctx, TIntrusivePtr<::NMonitoring: return CreateSchemeBoardSchemeCache(cacheConfig.Get()); } +void CheckEntrySetHasTopicPath(auto* scNavigate) { + for (auto& entry : scNavigate->ResultSet) { + if (entry.PQGroupInfo && entry.PQGroupInfo->Description.HasPQTabletConfig()) { + if (entry.PQGroupInfo->Description.GetPQTabletConfig().GetTopicPath().empty()) { + auto* newGroupInfo = new NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo(*entry.PQGroupInfo); + newGroupInfo->Description.MutablePQTabletConfig()->SetTopicPath(NKikimr::JoinPath(entry.Path)); + entry.PQGroupInfo.Reset(newGroupInfo); + } + } + } +} + + class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheActor> { using TBase = TActorBootstrapped<TPersQueueMetaCacheActor>; public: @@ -318,7 +331,7 @@ private: //return true; return SecondTryTopics.empty(); //ToDo - second try topics } - const std::shared_ptr<TSchemeCacheNavigate>& GetResult() { + std::shared_ptr<TSchemeCacheNavigate>& GetResult() { Y_VERIFY(Result != nullptr); return Result; }; @@ -449,6 +462,8 @@ private: FullTopicsCacheOutdated = true; } FullTopicsCache = waiter->GetResult(); + CheckEntrySetHasTopicPath(FullTopicsCache.get()); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size()); while (DescribeAllTopicsWaiters) { SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front()->WaiterId, waiter->Topics, ctx); @@ -459,11 +474,11 @@ private: Y_VERIFY(!waiterIter.IsEnd()); Y_VERIFY(waiterIter->second->Topics.size() == navigate->ResultSet.size()); + CheckEntrySetHasTopicPath(navigate.get()); auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{ std::move(waiter->Topics), navigate }; LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got describe topics SC response"); - ctx.Send(waiter->WaiterId, response); DescribeTopicsWaiters.erase(waiterIter); } @@ -551,7 +566,7 @@ private: }; -IActor* CreatePQMetaCache(const ::NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) { +IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) { return new TPersQueueMetaCacheActor(counters, versionCheckInterval); } |