diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-07-21 10:29:59 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-07-21 10:29:59 +0300 |
commit | f4ea7717398bf9fb9b334e265b41dfc4f54d1096 (patch) | |
tree | a1d5516eab84818c45ee74cfde998613adc3f362 | |
parent | 91b32fadb3237b325674c27da4c48d741d7dcda3 (diff) | |
download | ydb-f4ea7717398bf9fb9b334e265b41dfc4f54d1096.tar.gz |
Split TableService session for common and table service part. KIKIMR-18788
25 files changed, 424 insertions, 225 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt index 7eb8172a98a..4cbaeff1960 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt @@ -9,6 +9,7 @@ add_subdirectory(common) add_subdirectory(db_driver_state) add_subdirectory(grpc_connections) +add_subdirectory(kqp_session_common) add_subdirectory(logger) add_subdirectory(make_request) add_subdirectory(plain_status) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..8372e613afe --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(impl-ydb_internal-kqp_session_common) +target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC + contrib-libs-cxxsupp + yutil + cpp-threading-future + lib-operation_id-protos + client-impl-ydb_endpoints +) +target_sources(impl-ydb_internal-kqp_session_common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..0747556da90 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(impl-ydb_internal-kqp_session_common) +target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-threading-future + lib-operation_id-protos + client-impl-ydb_endpoints +) +target_sources(impl-ydb_internal-kqp_session_common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..0747556da90 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(impl-ydb_internal-kqp_session_common) +target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-threading-future + lib-operation_id-protos + client-impl-ydb_endpoints +) +target_sources(impl-ydb_internal-kqp_session_common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..8372e613afe --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(impl-ydb_internal-kqp_session_common) +target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC + contrib-libs-cxxsupp + yutil + cpp-threading-future + lib-operation_id-protos + client-impl-ydb_endpoints +) +target_sources(impl-ydb_internal-kqp_session_common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp +) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp new file mode 100644 index 00000000000..a4cf92fd617 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp @@ -0,0 +1,177 @@ +#include "kqp_session_common.h" + +#include <ydb/public/lib/operation_id/operation_id.h> + +namespace NYdb { + +static ui64 GetNodeIdFromSession(const TStringType& sessionId) { + if (sessionId.empty()) { + return 0; + } + + try { + NKikimr::NOperationId::TOperationId opId(sessionId); + const auto& nodeIds = opId.GetValue("node_id"); + if (nodeIds.size() != 1) { + return 0; + } + + return FromStringWithDefault<ui64>(*nodeIds[0], 0); + + } catch (...) { + return 0; + } + return 0; +} + +TKqpSessionCommon::TKqpSessionCommon( + const TStringType& sessionId, const TStringType& endpoint, + bool isOwnedBySessionPool) + : Lock_() + , SessionId_(sessionId) + , EndpointKey_(endpoint, GetNodeIdFromSession(sessionId)) + , IsOwnedBySessionPool_(isOwnedBySessionPool) + , State_(S_STANDALONE) + , TimeToTouch_(TInstant::Now()) + , TimeInPast_(TInstant::Now()) + , NeedUpdateActiveCounter_(false) +{} + +TKqpSessionCommon::~TKqpSessionCommon() { + Unlink(); +} + +const TStringType& TKqpSessionCommon::GetId() const { + return SessionId_; +} + +const TStringType& TKqpSessionCommon::GetEndpoint() const { + return EndpointKey_.GetEndpoint(); +} + +const TEndpointKey& TKqpSessionCommon::GetEndpointKey() const { + return EndpointKey_; +} + +// Can be called from interceptor, need lock +void TKqpSessionCommon::MarkBroken() { + with_lock(Lock_) { + if (State_ == EState::S_ACTIVE) { + NeedUpdateActiveCounter_ = true; + } + State_ = EState::S_BROKEN; + } +} + +void TKqpSessionCommon::MarkAsClosing() { + with_lock(Lock_) { + if (State_ == EState::S_ACTIVE) { + NeedUpdateActiveCounter_ = true; + } + + State_ = EState::S_CLOSING; + } +} + +void TKqpSessionCommon::MarkStandalone() { + State_ = EState::S_STANDALONE; + NeedUpdateActiveCounter_ = false; +} + +void TKqpSessionCommon::MarkActive() { + State_ = EState::S_ACTIVE; + NeedUpdateActiveCounter_ = false; +} + +void TKqpSessionCommon::MarkIdle() { + State_ = EState::S_IDLE; + NeedUpdateActiveCounter_ = false; +} + +bool TKqpSessionCommon::IsOwnedBySessionPool() const { + return IsOwnedBySessionPool_; +} + +TKqpSessionCommon::EState TKqpSessionCommon::GetState() const { + // See comments in InjectSessionStatusInterception about lock + with_lock(Lock_) { + return State_; + } +} + +void TKqpSessionCommon::SetNeedUpdateActiveCounter(bool flag) { + NeedUpdateActiveCounter_ = flag; +} + +bool TKqpSessionCommon::NeedUpdateActiveCounter() const { + return NeedUpdateActiveCounter_; +} + +// We need lock here because this method can be called from different thread +// if client makes simultaneous calls on one session. +// It should be possible to rewrite this part. +void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval, + bool updateTimeInPast) +{ + auto now = TInstant::Now(); + with_lock(Lock_) { + if (updateTimeInPast) { + TimeInPast_ = now; + } + TimeToTouch_ = now + interval; + } +} + +void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval, + bool updateTimeInPast) +{ + auto now = TInstant::Now(); + if (updateTimeInPast) { + TimeInPast_ = now; + } + TimeToTouch_ = now + interval; +} + +TInstant TKqpSessionCommon::GetTimeToTouchFast() const { + return TimeToTouch_; +} + +TInstant TKqpSessionCommon::GetTimeInPastFast() const { + return TimeInPast_; +} + +// SetTimeInterval/GetTimeInterval, are not atomic! +void TKqpSessionCommon::SetTimeInterval(TDuration interval) { + TimeInterval_ = interval; +} + +TDuration TKqpSessionCommon::GetTimeInterval() const { + return TimeInterval_; +} + +//////////////////////////////////////////////////////////////////////////////// + +std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter( + std::shared_ptr<ISessionClient> client) +{ + return [client](TKqpSessionCommon* sessionImpl) { + switch (sessionImpl->GetState()) { + case TKqpSessionCommon::S_STANDALONE: + case TKqpSessionCommon::S_BROKEN: + case TKqpSessionCommon::S_CLOSING: + client->DeleteSession(sessionImpl); + break; + case TKqpSessionCommon::S_IDLE: + case TKqpSessionCommon::S_ACTIVE: { + if (!client->ReturnSession(sessionImpl)) { + client->DeleteSession(sessionImpl); + } + break; + } + default: + break; + } + }; +} + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h new file mode 100644 index 00000000000..58d59b56cab --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h @@ -0,0 +1,78 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h> + +#include <util/datetime/base.h> +#include <util/system/spinlock.h> + +#include <functional> + +namespace NYdb { + +//////////////////////////////////////////////////////////////////////////////// + +class TKqpSessionCommon : public TEndpointObj { +public: + TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint, + bool isOwnedBySessionPool); + + enum EState { + S_STANDALONE, + S_IDLE, + S_BROKEN, + S_ACTIVE, + S_CLOSING + }; + +public: + ~TKqpSessionCommon(); + + const TStringType& GetId() const; + const TStringType& GetEndpoint() const; + const TEndpointKey& GetEndpointKey() const; + void MarkBroken(); + void MarkAsClosing(); + void MarkStandalone(); + void MarkActive(); + void MarkIdle(); + bool IsOwnedBySessionPool() const; + EState GetState() const; + void SetNeedUpdateActiveCounter(bool flag); + bool NeedUpdateActiveCounter() const; + void InvalidateQueryInCache(const TStringType& key); + void InvalidateQueryCache(); + void ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast); + void ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast); + TInstant GetTimeToTouchFast() const; + TInstant GetTimeInPastFast() const; + + // SetTimeInterval/GetTimeInterval, are not atomic! + void SetTimeInterval(TDuration interval); + TDuration GetTimeInterval() const; + + static std::function<void(TKqpSessionCommon*)> + GetSmartDeleter(std::shared_ptr<ISessionClient> client); + +protected: + TAdaptiveLock Lock_; + +private: + const TStringType SessionId_; + const TEndpointKey EndpointKey_; + const bool IsOwnedBySessionPool_; + + EState State_; + TInstant TimeToTouch_; + TInstant TimeInPast_; + // Is used to implement progressive timeout for settler keep alive call + TDuration TimeInterval_; + // Indicate session was in active state, but state was changed + // (need to decrement active session counter) + // TODO: suboptimal because need lock for atomic change from interceptor + // Rewrite with bit field + bool NeedUpdateActiveCounter_; +}; + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make new file mode 100644 index 00000000000..ac4ebdf6e06 --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + kqp_session_common.cpp +) + +PEERDIR( + library/cpp/threading/future + ydb/public/lib/operation_id/protos + ydb/public/sdk/cpp/client/impl/ydb_endpoints +) + + +END() diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h new file mode 100644 index 00000000000..ab6393261cf --- /dev/null +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h @@ -0,0 +1,20 @@ +#pragma once + +#include <functional> + +#include <util/datetime/base.h> + +namespace NYdb { + +class TKqpSessionCommon; + +class ISessionClient { +public: + virtual ~ISessionClient() = default; + virtual void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) = 0; + virtual void DeleteSession(TKqpSessionCommon* sessionImpl) = 0; + // TODO: Try to remove from ISessionClient + virtual bool ReturnSession(TKqpSessionCommon* sessionImpl) = 0; +}; + +} diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt index 4fd0fe3cd58..21b159dc8b3 100644 --- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt +++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt @@ -1 +1 @@ -2.5.1
\ No newline at end of file +2.5.2
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt index 5d35f202cc5..33b6f4812af 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC tools-enum_parser-enum_serialization_runtime api-protos impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common cpp-client-ydb_driver cpp-client-ydb_params cpp-client-ydb_proto diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt index cc4ba76506e..241f404ed39 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC tools-enum_parser-enum_serialization_runtime api-protos impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common cpp-client-ydb_driver cpp-client-ydb_params cpp-client-ydb_proto diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt index cc4ba76506e..241f404ed39 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC tools-enum_parser-enum_serialization_runtime api-protos impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common cpp-client-ydb_driver cpp-client-ydb_params cpp-client-ydb_proto diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt index 5d35f202cc5..33b6f4812af 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC tools-enum_parser-enum_serialization_runtime api-protos impl-ydb_internal-make_request + impl-ydb_internal-kqp_session_common cpp-client-ydb_driver cpp-client-ydb_params cpp-client-ydb_proto diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp index a15d9d9fa13..772e7c4aa36 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp @@ -6,109 +6,12 @@ namespace NYdb { namespace NTable { -ui64 GetNodeIdFromSession(const TString& sessionId) { - if (sessionId.empty()) { - return 0; - } - - try { - NKikimr::NOperationId::TOperationId opId(sessionId); - const auto& nodeIds = opId.GetValue("node_id"); - if (nodeIds.size() != 1) { - return 0; - } - - return FromStringWithDefault<ui64>(*nodeIds[0], 0); - - } catch (...) { - return 0; - } - return 0; -} - TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool) - : SessionId_(sessionId) - , EndpointKey_(endpoint, GetNodeIdFromSession(sessionId)) - , State_(S_STANDALONE) + : TKqpSessionCommon(sessionId, endpoint, isOwnedBySessionPool) , UseQueryCache_(useQueryCache) , QueryCache_(queryCacheSize) - , Lock_() - , TimeToTouch_(TInstant::Now()) - , TimeInPast_(TInstant::Now()) - , NeedUpdateActiveCounter_(false) - , IsOwnedBySessionPool_(isOwnedBySessionPool) {} -TSession::TImpl::~TImpl() { - Unlink(); -} - -const TString& TSession::TImpl::GetId() const { - return SessionId_; -} - -const TString& TSession::TImpl::GetEndpoint() const { - return EndpointKey_.GetEndpoint(); -} - -const TEndpointKey& TSession::TImpl::GetEndpointKey() const { - return EndpointKey_; -} - -// Can be called from interceptor, need lock -void TSession::TImpl::MarkBroken() { - with_lock(Lock_) { - if (State_ == EState::S_ACTIVE) { - NeedUpdateActiveCounter_ = true; - } - State_ = EState::S_BROKEN; - } -} - -void TSession::TImpl::MarkAsClosing() { - with_lock(Lock_) { - if (State_ == EState::S_ACTIVE) { - NeedUpdateActiveCounter_ = true; - } - - State_ = EState::S_CLOSING; - } -} - -void TSession::TImpl::MarkStandalone() { - State_ = EState::S_STANDALONE; - NeedUpdateActiveCounter_ = false; -} - -void TSession::TImpl::MarkActive() { - State_ = EState::S_ACTIVE; - NeedUpdateActiveCounter_ = false; -} - -void TSession::TImpl::MarkIdle() { - State_ = EState::S_IDLE; - NeedUpdateActiveCounter_ = false; -} - -bool TSession::TImpl::IsOwnedBySessionPool() const { - return IsOwnedBySessionPool_; -} - -TSession::TImpl::EState TSession::TImpl::GetState() const { - // See comments in InjectSessionStatusInterception about lock - with_lock(Lock_) { - return State_; - } -} - -void TSession::TImpl::SetNeedUpdateActiveCounter(bool flag) { - NeedUpdateActiveCounter_ = flag; -} - -bool TSession::TImpl::NeedUpdateActiveCounter() const { - return NeedUpdateActiveCounter_; -} - void TSession::TImpl::InvalidateQueryInCache(const TString& key) { if (!UseQueryCache_) { return; @@ -172,43 +75,6 @@ void TSession::TImpl::AddQueryToCache(const TDataQuery& query) { } } -// We need lock here because this method can be called from different thread if client -// makes simultaneous calls on one session. It should be possible to rewrite this part. -void TSession::TImpl::ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast) { - auto now = TInstant::Now(); - with_lock(Lock_) { - if (updateTimeInPast) { - TimeInPast_ = now; - } - TimeToTouch_ = now + interval; - } -} - -void TSession::TImpl::ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast) { - auto now = TInstant::Now(); - if (updateTimeInPast) { - TimeInPast_ = now; - } - TimeToTouch_ = now + interval; -} - -TInstant TSession::TImpl::GetTimeToTouchFast() const { - return TimeToTouch_; -} - -TInstant TSession::TImpl::GetTimeInPastFast() const { - return TimeInPast_; -} - -// SetTimeInterval/GetTimeInterval, are not atomic! -void TSession::TImpl::SetTimeInterval(TDuration interval) { - TimeInterval_ = interval; -} - -TDuration TSession::TImpl::GetTimeInterval() const { - return TimeInterval_; -} - const TLRUCache<TString, TSession::TImpl::TDataQueryInfo>& TSession::TImpl::GetQueryCacheUnsafe() const { return QueryCache_; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h index 9fb0330fe4d..4b9d482f604 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h> #include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h> #include <ydb/public/lib/operation_id/operation_id.h> @@ -20,7 +21,7 @@ namespace NTable { using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>; -class TSession::TImpl : public TEndpointObj { +class TSession::TImpl : public TKqpSessionCommon { friend class TTableClient; friend class TSession; @@ -29,14 +30,6 @@ public: #endif TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool); public: - enum EState { - S_STANDALONE, - S_IDLE, - S_BROKEN, - S_ACTIVE, - S_CLOSING - }; - struct TDataQueryInfo { TString QueryId; ::google::protobuf::Map<TString, Ydb::Type> ParameterTypes; @@ -49,37 +42,15 @@ public: , ParameterTypes(parameterTypes) {} }; public: - ~TImpl(); - - const TString& GetId() const; - const TString& GetEndpoint() const; - const TEndpointKey& GetEndpointKey() const; - void MarkBroken(); - void MarkAsClosing(); - void MarkStandalone(); - void MarkActive(); - void MarkIdle(); - bool IsOwnedBySessionPool() const; - EState GetState() const; - void SetNeedUpdateActiveCounter(bool flag); - bool NeedUpdateActiveCounter() const; + ~TImpl() = default; + void InvalidateQueryInCache(const TString& key); void InvalidateQueryCache(); TMaybe<TDataQueryInfo> GetQueryFromCache(const TString& query, bool allowMigration); void AddQueryToCache(const TDataQuery& query); - void ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast); - void ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast); - TInstant GetTimeToTouchFast() const; - TInstant GetTimeInPastFast() const; - - // SetTimeInterval/GetTimeInterval, are not atomic! - void SetTimeInterval(TDuration interval); - TDuration GetTimeInterval() const; const TLRUCache<TString, TDataQueryInfo>& GetQueryCacheUnsafe() const; - static std::function<void(TSession::TImpl*)> GetSmartDeleter(std::shared_ptr<TTableClient::TImpl> client); - static TSessionInspectorFn GetSessionInspector( NThreading::TPromise<TCreateSessionResult>& promise, std::shared_ptr<TTableClient::TImpl> client, @@ -87,21 +58,8 @@ public: ui32 counter, bool needUpdateActiveSessionCounter); private: - const TString SessionId_; - const TEndpointKey EndpointKey_; - EState State_; bool UseQueryCache_; TLRUCache<TString, TDataQueryInfo> QueryCache_; - TAdaptiveLock Lock_; - TInstant TimeToTouch_; - TInstant TimeInPast_; - // Is used to implement progressive timeout for settler keep alive call - TDuration TimeInterval_; - // Indicate session was in active state, but state was changed (need to decrement active session counter) - // TODO: suboptimal because need lock for atomic change from interceptor - // Rewrite with bit field - bool NeedUpdateActiveCounter_; - const bool IsOwnedBySessionPool_; }; } // namespace NTable diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp index 35f91b69b0d..21fe00039b4 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp @@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) { CurHost_ = nodeId; } -bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const { +bool TRequestMigrator::IsOurSession(const TKqpSessionCommon* session) const { if (!CurHost_) return false; @@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() { } } -bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) { +bool TRequestMigrator::DoCheckAndMigrate(const TKqpSessionCommon* session) { if (session->GetEndpoint().empty()) return false; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h index fbe6875b3aa..b2b146047b7 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h @@ -51,13 +51,13 @@ public: // Returns false if session is not suitable or unable to get lock to start migration // Returns true if session is suitable in this case Unlink methos on the session is called // This methos is thread safe. - bool DoCheckAndMigrate(const TSession::TImpl* session); + bool DoCheckAndMigrate(const TKqpSessionCommon* session); // Reset migrator to initiall state if migration was not started and returns true // Returns false if migration was started bool Reset(); private: - bool IsOurSession(const TSession::TImpl* session) const; + bool IsOurSession(const TKqpSessionCommon* session) const; ui64 CurHost_ = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp index cdd314c787f..53e98fde737 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp @@ -88,7 +88,7 @@ void TSessionPool::CreateFakeSession( } void TSessionPool::MakeSessionPromiseFromSession( - TSession::TImpl* session, + TKqpSessionCommon* session, NThreading::TPromise<TCreateSessionResult>& promise, std::shared_ptr<TTableClient::TImpl> client ) { @@ -102,7 +102,8 @@ void TSessionPool::MakeSessionPromiseFromSession( TSession( client, std::shared_ptr<TSession::TImpl>( - session, TSession::TImpl::GetSmartDeleter(client) + static_cast<TSession::TImpl*>(session), + TSession::TImpl::GetSmartDeleter(client) ) ) ); @@ -201,7 +202,7 @@ bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TI return true; } -bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { +bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) { // Do not set promise under the session pool lock NThreading::TPromise<TCreateSessionResult> createSessionPromise; { @@ -214,7 +215,9 @@ bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared if (!active) IncrementActiveCounterUnsafe(); } else { - Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl)); + Sessions_.emplace(std::make_pair( + impl->GetTimeToTouchFast(), + static_cast<TSession::TImpl*>(impl))); if (active) { Y_VERIFY(ActiveSessions_); diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h index 7a83b4115bb..702fe185f74 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h @@ -48,7 +48,7 @@ public: std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); // Returns true if session returned to pool successfully - bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); + bool ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client); // Returns trun if has waiter and scheduled to create new session // too feed it bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active); @@ -67,7 +67,7 @@ public: private: void UpdateStats(); void MakeSessionPromiseFromSession( - TSession::TImpl* session, + TKqpSessionCommon* session, NThreading::TPromise<TCreateSessionResult>& promise, std::shared_ptr<TTableClient::TImpl> client ); diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp index 4cd1ec054e1..3ddb4495c4f 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp @@ -824,7 +824,7 @@ TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&& return promise.GetFuture(); } -TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) { +TAsyncStatus TTableClient::TImpl::Close(const TKqpSessionCommon* sessionImpl, const TCloseSessionSettings& settings) { auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings); request.set_session_id(sessionImpl->GetId()); return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>( @@ -834,7 +834,7 @@ TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, cons sessionImpl->GetEndpointKey()); } -TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionImpl) { +TAsyncStatus TTableClient::TImpl::CloseInternal(const TKqpSessionCommon* sessionImpl) { static const auto internalCloseSessionSettings = TCloseSessionSettings() .ClientTimeout(TDuration::Seconds(2)); @@ -847,7 +847,7 @@ TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionIm }); } -bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) { +bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) { Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE || sessionImpl->GetState() == TSession::TImpl::S_IDLE); @@ -867,7 +867,7 @@ bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) { return true; } -void TTableClient::TImpl::DeleteSession(TSession::TImpl* sessionImpl) { +void TTableClient::TImpl::DeleteSession(TKqpSessionCommon* sessionImpl) { // Closing not owned by session pool session should not fire getting new session if (sessionImpl->IsOwnedBySessionPool()) { if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this(), sessionImpl->NeedUpdateActiveCounter())) { diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h index 73b8ee12d9b..d9c9161f195 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h +++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h @@ -1,6 +1,7 @@ #pragma once #define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/scheme_helpers/helpers.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> @@ -96,7 +97,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception( } -class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient { +class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public ISessionClient { public: using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr; using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr; @@ -109,7 +110,7 @@ public: NThreading::TFuture<void> Drain(); NThreading::TFuture<void> Stop(); void ScheduleTask(const std::function<void()>& fn, TDuration timeout); - void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout); + void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) override; void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn); void StartPeriodicSessionPoolTask(); static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client); @@ -194,11 +195,11 @@ public: const TReadTableSettings& settings); TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TVector<TString>& columns, const TReadRowsSettings& settings); - TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings); - TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl); + TAsyncStatus Close(const TKqpSessionCommon* sessionImpl, const TCloseSessionSettings& settings); + TAsyncStatus CloseInternal(const TKqpSessionCommon* sessionImpl); - bool ReturnSession(TSession::TImpl* sessionImpl); - void DeleteSession(TSession::TImpl* sessionImpl); + bool ReturnSession(TKqpSessionCommon* sessionImpl) override; + void DeleteSession(TKqpSessionCommon* sessionImpl) override; ui32 GetSessionRetryLimit() const; static void CloseAndDeleteSession( std::unique_ptr<TSession::TImpl>&& impl, diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 461a41a7c85..10209fce5b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2372,29 +2372,6 @@ const TMaybe<TQueryStats>& TCommitTransactionResult::GetStats() const { //////////////////////////////////////////////////////////////////////////////// -std::function<void(TSession::TImpl*)> TSession::TImpl::GetSmartDeleter(std::shared_ptr<TTableClient::TImpl> client) { - return [client](TSession::TImpl* sessionImpl) { - switch (sessionImpl->GetState()) { - case TSession::TImpl::S_STANDALONE: - case TSession::TImpl::S_BROKEN: - case TSession::TImpl::S_CLOSING: - client->DeleteSession(sessionImpl); - break; - case TSession::TImpl::S_IDLE: - case TSession::TImpl::S_ACTIVE: { - if (!client->ReturnSession(sessionImpl)) { - client->DeleteSession(sessionImpl); - } - break; - } - default: - break; - } - }; -} - -//////////////////////////////////////////////////////////////////////////////// - TCopyItem::TCopyItem(const TString& source, const TString& destination) : Source_(source) , Destination_(destination) diff --git a/ydb/public/sdk/cpp/client/ydb_table/ya.make b/ydb/public/sdk/cpp/client/ydb_table/ya.make index c998ef545a8..faba611793b 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_table/ya.make @@ -10,6 +10,7 @@ GENERATE_ENUM_SERIALIZATION(table_enum.h) PEERDIR( ydb/public/api/protos ydb/public/sdk/cpp/client/impl/ydb_internal/make_request + ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common ydb/public/sdk/cpp/client/ydb_driver ydb/public/sdk/cpp/client/ydb_params ydb/public/sdk/cpp/client/ydb_proto |