diff options
author | gvit <gvit@ydb.tech> | 2023-03-18 00:15:50 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-03-18 00:15:50 +0300 |
commit | c4e63099985c9d18877481b71518ade583621f08 (patch) | |
tree | d48d78fc1bf5657ad5ab960f3984ddfc672e7333 | |
parent | 66e822ece4afb41d7a4bb9fea312f4665bdd2b22 (diff) | |
download | ydb-c4e63099985c9d18877481b71518ade583621f08.tar.gz |
fix idle checker in kqp proxy
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 34 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.h | 1 | ||||
-rw-r--r-- | ydb/tests/functional/api/test_session_grace_shutdown.py | 38 |
3 files changed, 63 insertions, 10 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 92a88dcdc7..0b64f1354f 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -225,35 +225,48 @@ public: PublishResourceUsage(); AskSelfNodeInfo(); SendWhiteboardRequest(); - ScheduleIdleSessionCheck(); + ScheduleIdleSessionCheck(TDuration::Seconds(2)); } TDuration GetSessionIdleDuration() const { return TDuration::Seconds(TableServiceConfig.GetSessionIdleDurationSeconds()); } - void ScheduleIdleSessionCheck() { + void ScheduleIdleSessionCheck(const TDuration& scheduleInterval) { if (!ShutdownState) { - const TDuration IdleSessionsCheckInterval = TDuration::Seconds(2); - Schedule(IdleSessionsCheckInterval, new TEvPrivate::TEvCloseIdleSessions()); + Schedule(scheduleInterval, new TEvPrivate::TEvCloseIdleSessions()); } } void Handle(TEvPrivate::TEvCloseIdleSessions::TPtr&) { - CheckIdleSessions(); - ScheduleIdleSessionCheck(); + bool hasMoreToShutdown = CheckIdleSessions(); + if (hasMoreToShutdown) { + // we already performed several session shutdowns, but there are many sessions to + // be shutdowned. so we need to speadup the process. + static const TDuration quickIdleCheckInterval = TDuration::MilliSeconds(10); + ScheduleIdleSessionCheck(quickIdleCheckInterval); + } else { + static const TDuration defaultIdleCheckInterval = TDuration::Seconds(2); + ScheduleIdleSessionCheck(defaultIdleCheckInterval); + } } - void CheckIdleSessions(const ui32 maxSessionsToClose = 10) { + bool CheckIdleSessions(const ui32 maxSessionsToClose = 10) { ui32 closedIdleSessions = 0; const NActors::TMonotonic now = TActivationContext::Monotonic(); while(true) { const TKqpSessionInfo* sessionInfo = LocalSessions->GetIdleSession(now); - if (sessionInfo == nullptr || closedIdleSessions > maxSessionsToClose) - break; + if (sessionInfo == nullptr) + return false; + Counters->ReportSessionActorClosedIdle(sessionInfo->DbCounters); + LocalSessions->StopIdleCheck(sessionInfo); SendSessionClose(sessionInfo); ++closedIdleSessions; + + if (closedIdleSessions > maxSessionsToClose) { + return true; + } } } @@ -488,6 +501,9 @@ public: } auto responseEv = MakeHolder<TEvKqp::TEvCreateSessionResponse>(); + // If we create many sessions per second, it might be ok to check and close + // several idle sessions + CheckIdleSessions(3); TProcessResult<TKqpSessionInfo*> result; TKqpDbCountersPtr dbCounters; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index f4ef31c29b..a19fcce293 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -196,7 +196,6 @@ public: return nullptr; } - StopIdleCheck(candidate); return candidate; } diff --git a/ydb/tests/functional/api/test_session_grace_shutdown.py b/ydb/tests/functional/api/test_session_grace_shutdown.py index 583b8d834a..fb29a7924d 100644 --- a/ydb/tests/functional/api/test_session_grace_shutdown.py +++ b/ydb/tests/functional/api/test_session_grace_shutdown.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import logging +import time from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory from ydb.tests.oss.ydb_sdk_import import ydb @@ -49,3 +50,40 @@ class Test(object): assert iterations < 10 pool.release(session) + + +class TestIdle(object): + @classmethod + def setup_class(cls): + cls.cluster = kikimr_cluster_factory() + cls.cluster.start() + cls.driver_config = ydb.DriverConfig( + "%s:%s" % (cls.cluster.nodes[1].host, cls.cluster.nodes[1].port), database='/Root') + cls.driver = ydb.Driver(cls.driver_config) + cls.driver.wait(timeout=10) + + @classmethod + def teardown_class(cls): + if hasattr(cls, 'driver'): + cls.driver.stop() + + if hasattr(cls, 'cluster'): + cls.cluster.stop() + + def test_idle_shutdown_of_session(self): + self.cluster.client.add_config_item("TableServiceConfig { SessionIdleDurationSeconds: 1 }") + pool = ydb.SessionPool(self.driver, size=1000) + sessions = [pool.acquire() for _ in range(1000)] + + time.sleep(5) + closed_idle = 0 + + for session in sessions: + try: + session.transaction().execute('select 1;', commit_tx=True) + except ydb.BadSession: + closed_idle += 1 + finally: + pool.release(session) + + assert closed_idle == 1000 |