aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-11-12 17:26:55 +0100
committerGitHub <noreply@github.com>2024-11-12 17:26:55 +0100
commit2ec8c2c79cbb8b1035a339b04d4f467b1e939c13 (patch)
tree5bf7f121025329fd8e812720428b19b8abac0103
parent3f45718ec7e39b66a2f60f603e3614ef0fecadbe (diff)
downloadydb-2ec8c2c79cbb8b1035a339b04d4f467b1e939c13.tar.gz
Fix memory leak in case of launch periodic keep-alive task for query service session (#11489)
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp79
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp6
3 files changed, 85 insertions, 7 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index c97a2509abf..5dc9cfeac69 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -90,6 +90,85 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
WaitForZeroSessions(counters);
}
+ Y_UNIT_TEST(PeriodicTaskInSessionPool) {
+ auto kikimr = DefaultKikimrRunner();
+ auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ {
+ auto db = kikimr.GetQueryClient();
+
+ TString id;
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetSession().GetId());
+ auto session = result.GetSession();
+ id = session.GetId();
+
+ auto execResult = session.ExecuteQuery("SELECT 1;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
+ }
+ // This time is more then internal sdk periodic timeout but less than close session
+ // expect nothing happens with session in the pool
+ Sleep(TDuration::Seconds(10));
+
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetSession().GetId() == id);
+
+ auto execResult = result.GetSession().ExecuteQuery("SELECT 1;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
+ }
+ }
+ WaitForZeroSessions(counters);
+ }
+
+ Y_UNIT_TEST(PeriodicTaskInSessionPoolSessionCloseByIdle) {
+ auto kikimr = DefaultKikimrRunner();
+ auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint());
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ {
+ auto settings = NYdb::NQuery::TClientSettings().SessionPoolSettings(
+ NYdb::NQuery::TSessionPoolSettings()
+ .MinPoolSize(0)
+ .CloseIdleThreshold(TDuration::Seconds(1)));
+ auto db = kikimr.GetQueryClient(settings);
+
+ TString id;
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetSession().GetId());
+ auto session = result.GetSession();
+ id = session.GetId();
+
+ auto execResult = session.ExecuteQuery("SELECT 1;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
+ }
+
+ Sleep(TDuration::Seconds(11));
+ UNIT_ASSERT_VALUES_EQUAL(db.GetCurrentPoolSize(), 0);
+
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetSession().GetId() != id);
+
+ auto execResult = result.GetSession().ExecuteQuery("SELECT 1;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS);
+ }
+ }
+ WaitForZeroSessions(counters);
+ }
+
Y_UNIT_TEST(StreamExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
index b04ab5ecec4..92ff1929dcd 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
@@ -284,10 +284,13 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC
if (deletePredicate(it->second.get(), sessions.size())) {
sessionsToDelete.emplace_back(std::move(it->second));
- } else {
+ sessions.erase(it++);
+ } else if (cmd) {
sessionsToTouch.emplace_back(std::move(it->second));
+ sessions.erase(it++);
+ } else {
+ it++;
}
- sessions.erase(it++);
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index 6cdc1a4a73c..331f17ec429 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -484,15 +484,11 @@ public:
return false;
};
- // No need to keep-alive
- auto keepAliveCmd = [](TKqpSessionCommon*) {
- };
-
std::weak_ptr<TQueryClient::TImpl> weak = shared_from_this();
Connections_->AddPeriodicTask(
SessionPool_.CreatePeriodicTask(
weak,
- std::move(keepAliveCmd),
+ NSessionPool::TSessionPool::TKeepAliveCmd(), // no keep-alive cmd for query service
std::move(deletePredicate)
), NSessionPool::PERIODIC_ACTION_INTERVAL);
}