diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-11-12 17:26:55 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 17:26:55 +0100 |
commit | 2ec8c2c79cbb8b1035a339b04d4f467b1e939c13 (patch) | |
tree | 5bf7f121025329fd8e812720428b19b8abac0103 | |
parent | 3f45718ec7e39b66a2f60f603e3614ef0fecadbe (diff) | |
download | ydb-2ec8c2c79cbb8b1035a339b04d4f467b1e939c13.tar.gz |
Fix memory leak in case of launch periodic keep-alive task for query service session (#11489)
3 files changed, 85 insertions, 7 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index c97a2509abf..5dc9cfeac69 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -90,6 +90,85 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { WaitForZeroSessions(counters); } + Y_UNIT_TEST(PeriodicTaskInSessionPool) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto db = kikimr.GetQueryClient(); + + TString id; + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId()); + auto session = result.GetSession(); + id = session.GetId(); + + auto execResult = session.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + // This time is more then internal sdk periodic timeout but less than close session + // expect nothing happens with session in the pool + Sleep(TDuration::Seconds(10)); + + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId() == id); + + auto execResult = result.GetSession().ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + } + WaitForZeroSessions(counters); + } + + Y_UNIT_TEST(PeriodicTaskInSessionPoolSessionCloseByIdle) { + auto kikimr = DefaultKikimrRunner(); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters); + + { + auto settings = NYdb::NQuery::TClientSettings().SessionPoolSettings( + NYdb::NQuery::TSessionPoolSettings() + .MinPoolSize(0) + .CloseIdleThreshold(TDuration::Seconds(1))); + auto db = kikimr.GetQueryClient(settings); + + TString id; + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId()); + auto session = result.GetSession(); + id = session.GetId(); + + auto execResult = session.ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + + Sleep(TDuration::Seconds(11)); + UNIT_ASSERT_VALUES_EQUAL(db.GetCurrentPoolSize(), 0); + + { + auto result = db.GetSession().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetSession().GetId() != id); + + auto execResult = result.GetSession().ExecuteQuery("SELECT 1;", + NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(execResult.GetStatus(), EStatus::SUCCESS); + } + } + WaitForZeroSessions(counters); + } + Y_UNIT_TEST(StreamExecuteQueryPure) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp index b04ab5ecec4..92ff1929dcd 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp @@ -284,10 +284,13 @@ TPeriodicCb TSessionPool::CreatePeriodicTask(std::weak_ptr<ISessionClient> weakC if (deletePredicate(it->second.get(), sessions.size())) { sessionsToDelete.emplace_back(std::move(it->second)); - } else { + sessions.erase(it++); + } else if (cmd) { sessionsToTouch.emplace_back(std::move(it->second)); + sessions.erase(it++); + } else { + it++; } - sessions.erase(it++); } } diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index 6cdc1a4a73c..331f17ec429 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -484,15 +484,11 @@ public: return false; }; - // No need to keep-alive - auto keepAliveCmd = [](TKqpSessionCommon*) { - }; - std::weak_ptr<TQueryClient::TImpl> weak = shared_from_this(); Connections_->AddPeriodicTask( SessionPool_.CreatePeriodicTask( weak, - std::move(keepAliveCmd), + NSessionPool::TSessionPool::TKeepAliveCmd(), // no keep-alive cmd for query service std::move(deletePredicate) ), NSessionPool::PERIODIC_ACTION_INTERVAL); } |