aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2022-07-14 20:41:38 +0300
committerkomels <komels@ydb.tech>2022-07-14 20:41:38 +0300
commit21ae714cb6579660d3a558b277ac920d00bbb90b (patch)
tree28c784064b596d4d88dafd9bc597303ac978d105
parent1ea164fd21212c985cc1228349a2bec63af41fb1 (diff)
downloadydb-21ae714cb6579660d3a558b277ac920d00bbb90b.tar.gz
Set topic path always in metacache
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp21
1 files changed, 18 insertions, 3 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index d0194f184f9..85d7650857f 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -31,6 +31,19 @@ IActor* CreateSchemeCache(const TActorContext& ctx, TIntrusivePtr<::NMonitoring:
return CreateSchemeBoardSchemeCache(cacheConfig.Get());
}
+void CheckEntrySetHasTopicPath(auto* scNavigate) {
+ for (auto& entry : scNavigate->ResultSet) {
+ if (entry.PQGroupInfo && entry.PQGroupInfo->Description.HasPQTabletConfig()) {
+ if (entry.PQGroupInfo->Description.GetPQTabletConfig().GetTopicPath().empty()) {
+ auto* newGroupInfo = new NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo(*entry.PQGroupInfo);
+ newGroupInfo->Description.MutablePQTabletConfig()->SetTopicPath(NKikimr::JoinPath(entry.Path));
+ entry.PQGroupInfo.Reset(newGroupInfo);
+ }
+ }
+ }
+}
+
+
class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheActor> {
using TBase = TActorBootstrapped<TPersQueueMetaCacheActor>;
public:
@@ -318,7 +331,7 @@ private:
//return true;
return SecondTryTopics.empty(); //ToDo - second try topics
}
- const std::shared_ptr<TSchemeCacheNavigate>& GetResult() {
+ std::shared_ptr<TSchemeCacheNavigate>& GetResult() {
Y_VERIFY(Result != nullptr);
return Result;
};
@@ -449,6 +462,8 @@ private:
FullTopicsCacheOutdated = true;
}
FullTopicsCache = waiter->GetResult();
+ CheckEntrySetHasTopicPath(FullTopicsCache.get());
+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size());
while (DescribeAllTopicsWaiters) {
SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front()->WaiterId, waiter->Topics, ctx);
@@ -459,11 +474,11 @@ private:
Y_VERIFY(!waiterIter.IsEnd());
Y_VERIFY(waiterIter->second->Topics.size() == navigate->ResultSet.size());
+ CheckEntrySetHasTopicPath(navigate.get());
auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{
std::move(waiter->Topics), navigate
};
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got describe topics SC response");
-
ctx.Send(waiter->WaiterId, response);
DescribeTopicsWaiters.erase(waiterIter);
}
@@ -551,7 +566,7 @@ private:
};
-IActor* CreatePQMetaCache(const ::NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) {
+IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) {
return new TPersQueueMetaCacheActor(counters, versionCheckInterval);
}