aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-07-29 13:25:02 +0300
committeralexnick <alexnick@ydb.tech>2022-07-29 13:25:02 +0300
commitc42419d12ba88adc5e1dcaac8577df094fa7d92e (patch)
treea9835c5867877a9f056802e6482fbbce25942b74
parent7baacb9d37d5972b6def31a42c0cf969dcac05cd (diff)
downloadydb-c42419d12ba88adc5e1dcaac8577df094fa7d92e.tar.gz
fix for memory leak
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp22
-rw-r--r--ydb/core/protos/services.proto1
2 files changed, 17 insertions, 6 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index c181c36cd19..1dd73918216 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -68,6 +68,11 @@ public:
{
}
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::PQ_META_CACHE;
+ }
+
+
void Bootstrap(const TActorContext& ctx) {
Become(&TPersQueueMetaCacheActor::StateFunc);
@@ -117,6 +122,7 @@ private:
Generation->Inc();
LastTopicKey = {};
Type = EQueryType::ECheckVersion;
+ //TODO: on start there will be additional delay for VersionCheckInterval
ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup());
}
@@ -396,7 +402,6 @@ private:
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache");
return SendDescribeAllTopicsResponse(waiter, CurrentTopics, ctx);
}
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Describe all topics - send request");
SendSchemeCacheRequest(
std::make_shared<TWaiter>(waiter, DbRoot, false, false, CurrentTopics, EWaiterType::DescribeAllTopics),
ctx
@@ -418,6 +423,11 @@ private:
auto schemeCacheRequest = std::make_unique<TSchemeCacheNavigate>(reqId);
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(reqId, waiter)).second;
Y_VERIFY(inserted);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "send request for "
+ << (waiter->Type == EWaiterType::DescribeAllTopics ? " all " : "") << waiter->GetTopics().size()
+ << " topics, got " << DescribeTopicsWaiters.size() << " requests infly");
+
for (const auto& path : waiter->GetTopics()) {
auto split = NKikimr::SplitPath(path);
Y_VERIFY(!split.empty());
@@ -439,8 +449,10 @@ private:
<< ": result# " << result->ToString(*AppData()->TypeRegistry));
auto waiterIter = DescribeTopicsWaiters.find(result->Instant);
Y_VERIFY(!waiterIter.IsEnd());
- auto& waiter = waiterIter->second;
+ auto waiter = waiterIter->second; //copy shared ptr
auto res = waiter->ApplyResult(result);
+ DescribeTopicsWaiters.erase(waiterIter);
+
if (!res) {
// First attempt topics failed
SendSchemeCacheRequest(waiter, ctx);
@@ -465,22 +477,20 @@ private:
CheckEntrySetHasTopicPath(FullTopicsCache.get());
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size());
- while (DescribeAllTopicsWaiters) {
+ while (!DescribeAllTopicsWaiters.empty()) {
SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front()->WaiterId, waiter->Topics, ctx);
DescribeAllTopicsWaiters.pop();
}
} else {
auto& navigate = waiter->GetResult();
- Y_VERIFY(!waiterIter.IsEnd());
- Y_VERIFY(waiterIter->second->Topics.size() == navigate->ResultSet.size());
+ Y_VERIFY(waiter->Topics.size() == navigate->ResultSet.size());
CheckEntrySetHasTopicPath(navigate.get());
auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{
std::move(waiter->Topics), navigate
};
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got describe topics SC response");
ctx.Send(waiter->WaiterId, response);
- DescribeTopicsWaiters.erase(waiterIter);
}
}
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 0140c9cae2c..69f74043b1c 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -901,5 +901,6 @@ message TActivity {
KQP_STREAM_LOOKUP_ACTOR = 572;
BS_STORAGE_STATS_ACTOR = 573;
DS_LOAD_ACTOR = 574;
+ PQ_META_CACHE = 575;
};
};