aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-07-21 10:29:59 +0300
committerdcherednik <dcherednik@ydb.tech>2023-07-21 10:29:59 +0300
commitf4ea7717398bf9fb9b334e265b41dfc4f54d1096 (patch)
treea1d5516eab84818c45ee74cfde998613adc3f362
parent91b32fadb3237b325674c27da4c48d741d7dcda3 (diff)
downloadydb-f4ea7717398bf9fb9b334e265b41dfc4f54d1096.tar.gz
Split TableService session for common and table service part. KIKIMR-18788
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt17
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp177
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h78
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make14
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h20
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp136
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h50
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp23
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/ya.make1
25 files changed, 424 insertions, 225 deletions
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt
index 7eb8172a98a..4cbaeff1960 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/CMakeLists.txt
@@ -9,6 +9,7 @@
add_subdirectory(common)
add_subdirectory(db_driver_state)
add_subdirectory(grpc_connections)
+add_subdirectory(kqp_session_common)
add_subdirectory(logger)
add_subdirectory(make_request)
add_subdirectory(plain_status)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..8372e613afe
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(impl-ydb_internal-kqp_session_common)
+target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-threading-future
+ lib-operation_id-protos
+ client-impl-ydb_endpoints
+)
+target_sources(impl-ydb_internal-kqp_session_common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..0747556da90
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(impl-ydb_internal-kqp_session_common)
+target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-threading-future
+ lib-operation_id-protos
+ client-impl-ydb_endpoints
+)
+target_sources(impl-ydb_internal-kqp_session_common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..0747556da90
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(impl-ydb_internal-kqp_session_common)
+target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-threading-future
+ lib-operation_id-protos
+ client-impl-ydb_endpoints
+)
+target_sources(impl-ydb_internal-kqp_session_common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..8372e613afe
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(impl-ydb_internal-kqp_session_common)
+target_link_libraries(impl-ydb_internal-kqp_session_common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-threading-future
+ lib-operation_id-protos
+ client-impl-ydb_endpoints
+)
+target_sources(impl-ydb_internal-kqp_session_common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
new file mode 100644
index 00000000000..a4cf92fd617
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
@@ -0,0 +1,177 @@
+#include "kqp_session_common.h"
+
+#include <ydb/public/lib/operation_id/operation_id.h>
+
+namespace NYdb {
+
+static ui64 GetNodeIdFromSession(const TStringType& sessionId) {
+ if (sessionId.empty()) {
+ return 0;
+ }
+
+ try {
+ NKikimr::NOperationId::TOperationId opId(sessionId);
+ const auto& nodeIds = opId.GetValue("node_id");
+ if (nodeIds.size() != 1) {
+ return 0;
+ }
+
+ return FromStringWithDefault<ui64>(*nodeIds[0], 0);
+
+ } catch (...) {
+ return 0;
+ }
+ return 0;
+}
+
+TKqpSessionCommon::TKqpSessionCommon(
+ const TStringType& sessionId, const TStringType& endpoint,
+ bool isOwnedBySessionPool)
+ : Lock_()
+ , SessionId_(sessionId)
+ , EndpointKey_(endpoint, GetNodeIdFromSession(sessionId))
+ , IsOwnedBySessionPool_(isOwnedBySessionPool)
+ , State_(S_STANDALONE)
+ , TimeToTouch_(TInstant::Now())
+ , TimeInPast_(TInstant::Now())
+ , NeedUpdateActiveCounter_(false)
+{}
+
+TKqpSessionCommon::~TKqpSessionCommon() {
+ Unlink();
+}
+
+const TStringType& TKqpSessionCommon::GetId() const {
+ return SessionId_;
+}
+
+const TStringType& TKqpSessionCommon::GetEndpoint() const {
+ return EndpointKey_.GetEndpoint();
+}
+
+const TEndpointKey& TKqpSessionCommon::GetEndpointKey() const {
+ return EndpointKey_;
+}
+
+// Can be called from interceptor, need lock
+void TKqpSessionCommon::MarkBroken() {
+ with_lock(Lock_) {
+ if (State_ == EState::S_ACTIVE) {
+ NeedUpdateActiveCounter_ = true;
+ }
+ State_ = EState::S_BROKEN;
+ }
+}
+
+void TKqpSessionCommon::MarkAsClosing() {
+ with_lock(Lock_) {
+ if (State_ == EState::S_ACTIVE) {
+ NeedUpdateActiveCounter_ = true;
+ }
+
+ State_ = EState::S_CLOSING;
+ }
+}
+
+void TKqpSessionCommon::MarkStandalone() {
+ State_ = EState::S_STANDALONE;
+ NeedUpdateActiveCounter_ = false;
+}
+
+void TKqpSessionCommon::MarkActive() {
+ State_ = EState::S_ACTIVE;
+ NeedUpdateActiveCounter_ = false;
+}
+
+void TKqpSessionCommon::MarkIdle() {
+ State_ = EState::S_IDLE;
+ NeedUpdateActiveCounter_ = false;
+}
+
+bool TKqpSessionCommon::IsOwnedBySessionPool() const {
+ return IsOwnedBySessionPool_;
+}
+
+TKqpSessionCommon::EState TKqpSessionCommon::GetState() const {
+ // See comments in InjectSessionStatusInterception about lock
+ with_lock(Lock_) {
+ return State_;
+ }
+}
+
+void TKqpSessionCommon::SetNeedUpdateActiveCounter(bool flag) {
+ NeedUpdateActiveCounter_ = flag;
+}
+
+bool TKqpSessionCommon::NeedUpdateActiveCounter() const {
+ return NeedUpdateActiveCounter_;
+}
+
+// We need lock here because this method can be called from different thread
+// if client makes simultaneous calls on one session.
+// It should be possible to rewrite this part.
+void TKqpSessionCommon::ScheduleTimeToTouch(TDuration interval,
+ bool updateTimeInPast)
+{
+ auto now = TInstant::Now();
+ with_lock(Lock_) {
+ if (updateTimeInPast) {
+ TimeInPast_ = now;
+ }
+ TimeToTouch_ = now + interval;
+ }
+}
+
+void TKqpSessionCommon::ScheduleTimeToTouchFast(TDuration interval,
+ bool updateTimeInPast)
+{
+ auto now = TInstant::Now();
+ if (updateTimeInPast) {
+ TimeInPast_ = now;
+ }
+ TimeToTouch_ = now + interval;
+}
+
+TInstant TKqpSessionCommon::GetTimeToTouchFast() const {
+ return TimeToTouch_;
+}
+
+TInstant TKqpSessionCommon::GetTimeInPastFast() const {
+ return TimeInPast_;
+}
+
+// SetTimeInterval/GetTimeInterval, are not atomic!
+void TKqpSessionCommon::SetTimeInterval(TDuration interval) {
+ TimeInterval_ = interval;
+}
+
+TDuration TKqpSessionCommon::GetTimeInterval() const {
+ return TimeInterval_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::function<void(TKqpSessionCommon*)> TKqpSessionCommon::GetSmartDeleter(
+ std::shared_ptr<ISessionClient> client)
+{
+ return [client](TKqpSessionCommon* sessionImpl) {
+ switch (sessionImpl->GetState()) {
+ case TKqpSessionCommon::S_STANDALONE:
+ case TKqpSessionCommon::S_BROKEN:
+ case TKqpSessionCommon::S_CLOSING:
+ client->DeleteSession(sessionImpl);
+ break;
+ case TKqpSessionCommon::S_IDLE:
+ case TKqpSessionCommon::S_ACTIVE: {
+ if (!client->ReturnSession(sessionImpl)) {
+ client->DeleteSession(sessionImpl);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ };
+}
+
+} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
new file mode 100644
index 00000000000..58d59b56cab
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h>
+
+#include <util/datetime/base.h>
+#include <util/system/spinlock.h>
+
+#include <functional>
+
+namespace NYdb {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TKqpSessionCommon : public TEndpointObj {
+public:
+ TKqpSessionCommon(const TStringType& sessionId, const TStringType& endpoint,
+ bool isOwnedBySessionPool);
+
+ enum EState {
+ S_STANDALONE,
+ S_IDLE,
+ S_BROKEN,
+ S_ACTIVE,
+ S_CLOSING
+ };
+
+public:
+ ~TKqpSessionCommon();
+
+ const TStringType& GetId() const;
+ const TStringType& GetEndpoint() const;
+ const TEndpointKey& GetEndpointKey() const;
+ void MarkBroken();
+ void MarkAsClosing();
+ void MarkStandalone();
+ void MarkActive();
+ void MarkIdle();
+ bool IsOwnedBySessionPool() const;
+ EState GetState() const;
+ void SetNeedUpdateActiveCounter(bool flag);
+ bool NeedUpdateActiveCounter() const;
+ void InvalidateQueryInCache(const TStringType& key);
+ void InvalidateQueryCache();
+ void ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast);
+ void ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast);
+ TInstant GetTimeToTouchFast() const;
+ TInstant GetTimeInPastFast() const;
+
+ // SetTimeInterval/GetTimeInterval, are not atomic!
+ void SetTimeInterval(TDuration interval);
+ TDuration GetTimeInterval() const;
+
+ static std::function<void(TKqpSessionCommon*)>
+ GetSmartDeleter(std::shared_ptr<ISessionClient> client);
+
+protected:
+ TAdaptiveLock Lock_;
+
+private:
+ const TStringType SessionId_;
+ const TEndpointKey EndpointKey_;
+ const bool IsOwnedBySessionPool_;
+
+ EState State_;
+ TInstant TimeToTouch_;
+ TInstant TimeInPast_;
+ // Is used to implement progressive timeout for settler keep alive call
+ TDuration TimeInterval_;
+ // Indicate session was in active state, but state was changed
+ // (need to decrement active session counter)
+ // TODO: suboptimal because need lock for atomic change from interceptor
+ // Rewrite with bit field
+ bool NeedUpdateActiveCounter_;
+};
+
+} // namespace NYdb
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make
new file mode 100644
index 00000000000..ac4ebdf6e06
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ kqp_session_common.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/future
+ ydb/public/lib/operation_id/protos
+ ydb/public/sdk/cpp/client/impl/ydb_endpoints
+)
+
+
+END()
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h
new file mode 100644
index 00000000000..ab6393261cf
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <functional>
+
+#include <util/datetime/base.h>
+
+namespace NYdb {
+
+class TKqpSessionCommon;
+
+class ISessionClient {
+public:
+ virtual ~ISessionClient() = default;
+ virtual void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) = 0;
+ virtual void DeleteSession(TKqpSessionCommon* sessionImpl) = 0;
+ // TODO: Try to remove from ISessionClient
+ virtual bool ReturnSession(TKqpSessionCommon* sessionImpl) = 0;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 4fd0fe3cd58..21b159dc8b3 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.5.1 \ No newline at end of file
+2.5.2 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt
index 5d35f202cc5..33b6f4812af 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.darwin-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC
tools-enum_parser-enum_serialization_runtime
api-protos
impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
cpp-client-ydb_driver
cpp-client-ydb_params
cpp-client-ydb_proto
diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt
index cc4ba76506e..241f404ed39 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC
tools-enum_parser-enum_serialization_runtime
api-protos
impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
cpp-client-ydb_driver
cpp-client-ydb_params
cpp-client-ydb_proto
diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt
index cc4ba76506e..241f404ed39 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.linux-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC
tools-enum_parser-enum_serialization_runtime
api-protos
impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
cpp-client-ydb_driver
cpp-client-ydb_params
cpp-client-ydb_proto
diff --git a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt
index 5d35f202cc5..33b6f4812af 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_table/CMakeLists.windows-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(cpp-client-ydb_table PUBLIC
tools-enum_parser-enum_serialization_runtime
api-protos
impl-ydb_internal-make_request
+ impl-ydb_internal-kqp_session_common
cpp-client-ydb_driver
cpp-client-ydb_params
cpp-client-ydb_proto
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
index a15d9d9fa13..772e7c4aa36 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.cpp
@@ -6,109 +6,12 @@
namespace NYdb {
namespace NTable {
-ui64 GetNodeIdFromSession(const TString& sessionId) {
- if (sessionId.empty()) {
- return 0;
- }
-
- try {
- NKikimr::NOperationId::TOperationId opId(sessionId);
- const auto& nodeIds = opId.GetValue("node_id");
- if (nodeIds.size() != 1) {
- return 0;
- }
-
- return FromStringWithDefault<ui64>(*nodeIds[0], 0);
-
- } catch (...) {
- return 0;
- }
- return 0;
-}
-
TSession::TImpl::TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool)
- : SessionId_(sessionId)
- , EndpointKey_(endpoint, GetNodeIdFromSession(sessionId))
- , State_(S_STANDALONE)
+ : TKqpSessionCommon(sessionId, endpoint, isOwnedBySessionPool)
, UseQueryCache_(useQueryCache)
, QueryCache_(queryCacheSize)
- , Lock_()
- , TimeToTouch_(TInstant::Now())
- , TimeInPast_(TInstant::Now())
- , NeedUpdateActiveCounter_(false)
- , IsOwnedBySessionPool_(isOwnedBySessionPool)
{}
-TSession::TImpl::~TImpl() {
- Unlink();
-}
-
-const TString& TSession::TImpl::GetId() const {
- return SessionId_;
-}
-
-const TString& TSession::TImpl::GetEndpoint() const {
- return EndpointKey_.GetEndpoint();
-}
-
-const TEndpointKey& TSession::TImpl::GetEndpointKey() const {
- return EndpointKey_;
-}
-
-// Can be called from interceptor, need lock
-void TSession::TImpl::MarkBroken() {
- with_lock(Lock_) {
- if (State_ == EState::S_ACTIVE) {
- NeedUpdateActiveCounter_ = true;
- }
- State_ = EState::S_BROKEN;
- }
-}
-
-void TSession::TImpl::MarkAsClosing() {
- with_lock(Lock_) {
- if (State_ == EState::S_ACTIVE) {
- NeedUpdateActiveCounter_ = true;
- }
-
- State_ = EState::S_CLOSING;
- }
-}
-
-void TSession::TImpl::MarkStandalone() {
- State_ = EState::S_STANDALONE;
- NeedUpdateActiveCounter_ = false;
-}
-
-void TSession::TImpl::MarkActive() {
- State_ = EState::S_ACTIVE;
- NeedUpdateActiveCounter_ = false;
-}
-
-void TSession::TImpl::MarkIdle() {
- State_ = EState::S_IDLE;
- NeedUpdateActiveCounter_ = false;
-}
-
-bool TSession::TImpl::IsOwnedBySessionPool() const {
- return IsOwnedBySessionPool_;
-}
-
-TSession::TImpl::EState TSession::TImpl::GetState() const {
- // See comments in InjectSessionStatusInterception about lock
- with_lock(Lock_) {
- return State_;
- }
-}
-
-void TSession::TImpl::SetNeedUpdateActiveCounter(bool flag) {
- NeedUpdateActiveCounter_ = flag;
-}
-
-bool TSession::TImpl::NeedUpdateActiveCounter() const {
- return NeedUpdateActiveCounter_;
-}
-
void TSession::TImpl::InvalidateQueryInCache(const TString& key) {
if (!UseQueryCache_) {
return;
@@ -172,43 +75,6 @@ void TSession::TImpl::AddQueryToCache(const TDataQuery& query) {
}
}
-// We need lock here because this method can be called from different thread if client
-// makes simultaneous calls on one session. It should be possible to rewrite this part.
-void TSession::TImpl::ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast) {
- auto now = TInstant::Now();
- with_lock(Lock_) {
- if (updateTimeInPast) {
- TimeInPast_ = now;
- }
- TimeToTouch_ = now + interval;
- }
-}
-
-void TSession::TImpl::ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast) {
- auto now = TInstant::Now();
- if (updateTimeInPast) {
- TimeInPast_ = now;
- }
- TimeToTouch_ = now + interval;
-}
-
-TInstant TSession::TImpl::GetTimeToTouchFast() const {
- return TimeToTouch_;
-}
-
-TInstant TSession::TImpl::GetTimeInPastFast() const {
- return TimeInPast_;
-}
-
-// SetTimeInterval/GetTimeInterval, are not atomic!
-void TSession::TImpl::SetTimeInterval(TDuration interval) {
- TimeInterval_ = interval;
-}
-
-TDuration TSession::TImpl::GetTimeInterval() const {
- return TimeInterval_;
-}
-
const TLRUCache<TString, TSession::TImpl::TDataQueryInfo>& TSession::TImpl::GetQueryCacheUnsafe() const {
return QueryCache_;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
index 9fb0330fe4d..4b9d482f604 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/client_session.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h>
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -20,7 +21,7 @@ namespace NTable {
using TSessionInspectorFn = std::function<void(TAsyncCreateSessionResult future)>;
-class TSession::TImpl : public TEndpointObj {
+class TSession::TImpl : public TKqpSessionCommon {
friend class TTableClient;
friend class TSession;
@@ -29,14 +30,6 @@ public:
#endif
TImpl(const TString& sessionId, const TString& endpoint, bool useQueryCache, ui32 queryCacheSize, bool isOwnedBySessionPool);
public:
- enum EState {
- S_STANDALONE,
- S_IDLE,
- S_BROKEN,
- S_ACTIVE,
- S_CLOSING
- };
-
struct TDataQueryInfo {
TString QueryId;
::google::protobuf::Map<TString, Ydb::Type> ParameterTypes;
@@ -49,37 +42,15 @@ public:
, ParameterTypes(parameterTypes) {}
};
public:
- ~TImpl();
-
- const TString& GetId() const;
- const TString& GetEndpoint() const;
- const TEndpointKey& GetEndpointKey() const;
- void MarkBroken();
- void MarkAsClosing();
- void MarkStandalone();
- void MarkActive();
- void MarkIdle();
- bool IsOwnedBySessionPool() const;
- EState GetState() const;
- void SetNeedUpdateActiveCounter(bool flag);
- bool NeedUpdateActiveCounter() const;
+ ~TImpl() = default;
+
void InvalidateQueryInCache(const TString& key);
void InvalidateQueryCache();
TMaybe<TDataQueryInfo> GetQueryFromCache(const TString& query, bool allowMigration);
void AddQueryToCache(const TDataQuery& query);
- void ScheduleTimeToTouch(TDuration interval, bool updateTimeInPast);
- void ScheduleTimeToTouchFast(TDuration interval, bool updateTimeInPast);
- TInstant GetTimeToTouchFast() const;
- TInstant GetTimeInPastFast() const;
-
- // SetTimeInterval/GetTimeInterval, are not atomic!
- void SetTimeInterval(TDuration interval);
- TDuration GetTimeInterval() const;
const TLRUCache<TString, TDataQueryInfo>& GetQueryCacheUnsafe() const;
- static std::function<void(TSession::TImpl*)> GetSmartDeleter(std::shared_ptr<TTableClient::TImpl> client);
-
static TSessionInspectorFn GetSessionInspector(
NThreading::TPromise<TCreateSessionResult>& promise,
std::shared_ptr<TTableClient::TImpl> client,
@@ -87,21 +58,8 @@ public:
ui32 counter, bool needUpdateActiveSessionCounter);
private:
- const TString SessionId_;
- const TEndpointKey EndpointKey_;
- EState State_;
bool UseQueryCache_;
TLRUCache<TString, TDataQueryInfo> QueryCache_;
- TAdaptiveLock Lock_;
- TInstant TimeToTouch_;
- TInstant TimeInPast_;
- // Is used to implement progressive timeout for settler keep alive call
- TDuration TimeInterval_;
- // Indicate session was in active state, but state was changed (need to decrement active session counter)
- // TODO: suboptimal because need lock for atomic change from interceptor
- // Rewrite with bit field
- bool NeedUpdateActiveCounter_;
- const bool IsOwnedBySessionPool_;
};
} // namespace NTable
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
index 35f91b69b0d..21fe00039b4 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.cpp
@@ -40,7 +40,7 @@ void TRequestMigrator::SetHost(ui64 nodeId) {
CurHost_ = nodeId;
}
-bool TRequestMigrator::IsOurSession(const TSession::TImpl* session) const {
+bool TRequestMigrator::IsOurSession(const TKqpSessionCommon* session) const {
if (!CurHost_)
return false;
@@ -65,7 +65,7 @@ bool TRequestMigrator::Reset() {
}
}
-bool TRequestMigrator::DoCheckAndMigrate(const TSession::TImpl* session) {
+bool TRequestMigrator::DoCheckAndMigrate(const TKqpSessionCommon* session) {
if (session->GetEndpoint().empty())
return false;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
index fbe6875b3aa..b2b146047b7 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h
@@ -51,13 +51,13 @@ public:
// Returns false if session is not suitable or unable to get lock to start migration
// Returns true if session is suitable in this case Unlink methos on the session is called
// This methos is thread safe.
- bool DoCheckAndMigrate(const TSession::TImpl* session);
+ bool DoCheckAndMigrate(const TKqpSessionCommon* session);
// Reset migrator to initiall state if migration was not started and returns true
// Returns false if migration was started
bool Reset();
private:
- bool IsOurSession(const TSession::TImpl* session) const;
+ bool IsOurSession(const TKqpSessionCommon* session) const;
ui64 CurHost_ = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
index cdd314c787f..53e98fde737 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.cpp
@@ -88,7 +88,7 @@ void TSessionPool::CreateFakeSession(
}
void TSessionPool::MakeSessionPromiseFromSession(
- TSession::TImpl* session,
+ TKqpSessionCommon* session,
NThreading::TPromise<TCreateSessionResult>& promise,
std::shared_ptr<TTableClient::TImpl> client
) {
@@ -102,7 +102,8 @@ void TSessionPool::MakeSessionPromiseFromSession(
TSession(
client,
std::shared_ptr<TSession::TImpl>(
- session, TSession::TImpl::GetSmartDeleter(client)
+ static_cast<TSession::TImpl*>(session),
+ TSession::TImpl::GetSmartDeleter(client)
)
)
);
@@ -201,7 +202,7 @@ bool TSessionPool::CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TI
return true;
}
-bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
+bool TSessionPool::ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client) {
// Do not set promise under the session pool lock
NThreading::TPromise<TCreateSessionResult> createSessionPromise;
{
@@ -214,7 +215,9 @@ bool TSessionPool::ReturnSession(TSession::TImpl* impl, bool active, std::shared
if (!active)
IncrementActiveCounterUnsafe();
} else {
- Sessions_.emplace(std::make_pair(impl->GetTimeToTouchFast(), impl));
+ Sessions_.emplace(std::make_pair(
+ impl->GetTimeToTouchFast(),
+ static_cast<TSession::TImpl*>(impl)));
if (active) {
Y_VERIFY(ActiveSessions_);
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
index 7a83b4115bb..702fe185f74 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/session_pool.h
@@ -48,7 +48,7 @@ public:
std::shared_ptr<TTableClient::TImpl> client,
const TCreateSessionSettings& settings);
// Returns true if session returned to pool successfully
- bool ReturnSession(TSession::TImpl* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
+ bool ReturnSession(TKqpSessionCommon* impl, bool active, std::shared_ptr<TTableClient::TImpl> client);
// Returns trun if has waiter and scheduled to create new session
// too feed it
bool CheckAndFeedWaiterNewSession(std::shared_ptr<TTableClient::TImpl> client, bool active);
@@ -67,7 +67,7 @@ public:
private:
void UpdateStats();
void MakeSessionPromiseFromSession(
- TSession::TImpl* session,
+ TKqpSessionCommon* session,
NThreading::TPromise<TCreateSessionResult>& promise,
std::shared_ptr<TTableClient::TImpl> client
);
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index 4cd1ec054e1..3ddb4495c4f 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -824,7 +824,7 @@ TAsyncReadRowsResult TTableClient::TImpl::ReadRows(const TString& path, TValue&&
return promise.GetFuture();
}
-TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings) {
+TAsyncStatus TTableClient::TImpl::Close(const TKqpSessionCommon* sessionImpl, const TCloseSessionSettings& settings) {
auto request = MakeOperationRequest<Ydb::Table::DeleteSessionRequest>(settings);
request.set_session_id(sessionImpl->GetId());
return RunSimple<Ydb::Table::V1::TableService, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse>(
@@ -834,7 +834,7 @@ TAsyncStatus TTableClient::TImpl::Close(const TSession::TImpl* sessionImpl, cons
sessionImpl->GetEndpointKey());
}
-TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionImpl) {
+TAsyncStatus TTableClient::TImpl::CloseInternal(const TKqpSessionCommon* sessionImpl) {
static const auto internalCloseSessionSettings = TCloseSessionSettings()
.ClientTimeout(TDuration::Seconds(2));
@@ -847,7 +847,7 @@ TAsyncStatus TTableClient::TImpl::CloseInternal(const TSession::TImpl* sessionIm
});
}
-bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) {
+bool TTableClient::TImpl::ReturnSession(TKqpSessionCommon* sessionImpl) {
Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE ||
sessionImpl->GetState() == TSession::TImpl::S_IDLE);
@@ -867,7 +867,7 @@ bool TTableClient::TImpl::ReturnSession(TSession::TImpl* sessionImpl) {
return true;
}
-void TTableClient::TImpl::DeleteSession(TSession::TImpl* sessionImpl) {
+void TTableClient::TImpl::DeleteSession(TKqpSessionCommon* sessionImpl) {
// Closing not owned by session pool session should not fire getting new session
if (sessionImpl->IsOwnedBySessionPool()) {
if (SessionPool_.CheckAndFeedWaiterNewSession(shared_from_this(), sessionImpl->NeedUpdateActiveCounter())) {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index 73b8ee12d9b..d9c9161f195 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -1,6 +1,7 @@
#pragma once
#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/scheme_helpers/helpers.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/table_helpers/helpers.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
@@ -96,7 +97,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
}
-class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public IMigratorClient {
+class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public ISessionClient {
public:
using TReadTableStreamProcessorPtr = TTablePartIterator::TReaderImpl::TStreamProcessorPtr;
using TScanQueryProcessorPtr = TScanQueryPartIterator::TReaderImpl::TStreamProcessorPtr;
@@ -109,7 +110,7 @@ public:
NThreading::TFuture<void> Drain();
NThreading::TFuture<void> Stop();
void ScheduleTask(const std::function<void()>& fn, TDuration timeout);
- void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout);
+ void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) override;
void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn);
void StartPeriodicSessionPoolTask();
static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client);
@@ -194,11 +195,11 @@ public:
const TReadTableSettings& settings);
TAsyncReadRowsResult ReadRows(const TString& path, TValue&& keys, const TVector<TString>& columns, const TReadRowsSettings& settings);
- TAsyncStatus Close(const TSession::TImpl* sessionImpl, const TCloseSessionSettings& settings);
- TAsyncStatus CloseInternal(const TSession::TImpl* sessionImpl);
+ TAsyncStatus Close(const TKqpSessionCommon* sessionImpl, const TCloseSessionSettings& settings);
+ TAsyncStatus CloseInternal(const TKqpSessionCommon* sessionImpl);
- bool ReturnSession(TSession::TImpl* sessionImpl);
- void DeleteSession(TSession::TImpl* sessionImpl);
+ bool ReturnSession(TKqpSessionCommon* sessionImpl) override;
+ void DeleteSession(TKqpSessionCommon* sessionImpl) override;
ui32 GetSessionRetryLimit() const;
static void CloseAndDeleteSession(
std::unique_ptr<TSession::TImpl>&& impl,
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 461a41a7c85..10209fce5b9 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -2372,29 +2372,6 @@ const TMaybe<TQueryStats>& TCommitTransactionResult::GetStats() const {
////////////////////////////////////////////////////////////////////////////////
-std::function<void(TSession::TImpl*)> TSession::TImpl::GetSmartDeleter(std::shared_ptr<TTableClient::TImpl> client) {
- return [client](TSession::TImpl* sessionImpl) {
- switch (sessionImpl->GetState()) {
- case TSession::TImpl::S_STANDALONE:
- case TSession::TImpl::S_BROKEN:
- case TSession::TImpl::S_CLOSING:
- client->DeleteSession(sessionImpl);
- break;
- case TSession::TImpl::S_IDLE:
- case TSession::TImpl::S_ACTIVE: {
- if (!client->ReturnSession(sessionImpl)) {
- client->DeleteSession(sessionImpl);
- }
- break;
- }
- default:
- break;
- }
- };
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
TCopyItem::TCopyItem(const TString& source, const TString& destination)
: Source_(source)
, Destination_(destination)
diff --git a/ydb/public/sdk/cpp/client/ydb_table/ya.make b/ydb/public/sdk/cpp/client/ydb_table/ya.make
index c998ef545a8..faba611793b 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_table/ya.make
@@ -10,6 +10,7 @@ GENERATE_ENUM_SERIALIZATION(table_enum.h)
PEERDIR(
ydb/public/api/protos
ydb/public/sdk/cpp/client/impl/ydb_internal/make_request
+ ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common
ydb/public/sdk/cpp/client/ydb_driver
ydb/public/sdk/cpp/client/ydb_params
ydb/public/sdk/cpp/client/ydb_proto