diff options
author | alexnick <alexnick@ydb.tech> | 2022-07-29 13:25:02 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-07-29 13:25:02 +0300 |
commit | c42419d12ba88adc5e1dcaac8577df094fa7d92e (patch) | |
tree | a9835c5867877a9f056802e6482fbbce25942b74 | |
parent | 7baacb9d37d5972b6def31a42c0cf969dcac05cd (diff) | |
download | ydb-c42419d12ba88adc5e1dcaac8577df094fa7d92e.tar.gz |
fix for memory leak
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 22 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 1 |
2 files changed, 17 insertions, 6 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index c181c36cd19..1dd73918216 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -68,6 +68,11 @@ public: { } + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::PQ_META_CACHE; + } + + void Bootstrap(const TActorContext& ctx) { Become(&TPersQueueMetaCacheActor::StateFunc); @@ -117,6 +122,7 @@ private: Generation->Inc(); LastTopicKey = {}; Type = EQueryType::ECheckVersion; + //TODO: on start there will be additional delay for VersionCheckInterval ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup()); } @@ -396,7 +402,6 @@ private: LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache"); return SendDescribeAllTopicsResponse(waiter, CurrentTopics, ctx); } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Describe all topics - send request"); SendSchemeCacheRequest( std::make_shared<TWaiter>(waiter, DbRoot, false, false, CurrentTopics, EWaiterType::DescribeAllTopics), ctx @@ -418,6 +423,11 @@ private: auto schemeCacheRequest = std::make_unique<TSchemeCacheNavigate>(reqId); auto inserted = DescribeTopicsWaiters.insert(std::make_pair(reqId, waiter)).second; Y_VERIFY(inserted); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "send request for " + << (waiter->Type == EWaiterType::DescribeAllTopics ? " all " : "") << waiter->GetTopics().size() + << " topics, got " << DescribeTopicsWaiters.size() << " requests infly"); + for (const auto& path : waiter->GetTopics()) { auto split = NKikimr::SplitPath(path); Y_VERIFY(!split.empty()); @@ -439,8 +449,10 @@ private: << ": result# " << result->ToString(*AppData()->TypeRegistry)); auto waiterIter = DescribeTopicsWaiters.find(result->Instant); Y_VERIFY(!waiterIter.IsEnd()); - auto& waiter = waiterIter->second; + auto waiter = waiterIter->second; //copy shared ptr auto res = waiter->ApplyResult(result); + DescribeTopicsWaiters.erase(waiterIter); + if (!res) { // First attempt topics failed SendSchemeCacheRequest(waiter, ctx); @@ -465,22 +477,20 @@ private: CheckEntrySetHasTopicPath(FullTopicsCache.get()); LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size()); - while (DescribeAllTopicsWaiters) { + while (!DescribeAllTopicsWaiters.empty()) { SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front()->WaiterId, waiter->Topics, ctx); DescribeAllTopicsWaiters.pop(); } } else { auto& navigate = waiter->GetResult(); - Y_VERIFY(!waiterIter.IsEnd()); - Y_VERIFY(waiterIter->second->Topics.size() == navigate->ResultSet.size()); + Y_VERIFY(waiter->Topics.size() == navigate->ResultSet.size()); CheckEntrySetHasTopicPath(navigate.get()); auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{ std::move(waiter->Topics), navigate }; LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got describe topics SC response"); ctx.Send(waiter->WaiterId, response); - DescribeTopicsWaiters.erase(waiterIter); } } diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 0140c9cae2c..69f74043b1c 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -901,5 +901,6 @@ message TActivity { KQP_STREAM_LOOKUP_ACTOR = 572; BS_STORAGE_STATS_ACTOR = 573; DS_LOAD_ACTOR = 574; + PQ_META_CACHE = 575; }; }; |