aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-08-02 15:20:53 +0300
committerdcherednik <dcherednik@ydb.tech>2023-08-02 15:20:53 +0300
commitd199195371ee005f086b1a6d061c955c828b199d (patch)
treedae85b0bf6915ca56ce03c19c92cda704ee3c9d2
parentbf2a438dc5d975e2b908ee20895e46331d23211b (diff)
downloadydb-d199195371ee005f086b1a6d061c955c828b199d.tar.gz
Experimental session support for query service. KIKIMR-18788
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h6
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp34
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h3
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp37
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h58
-rw-r--r--ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp315
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.h76
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp62
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h40
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/impl/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp35
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h69
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h3
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp79
30 files changed, 720 insertions, 131 deletions
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index cc852fa465f..3275666d298 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -143,8 +143,10 @@ public:
.UseQueryCache(false));
}
- NYdb::NQuery::TQueryClient GetQueryClient() const {
- return NYdb::NQuery::TQueryClient(*Driver);
+ NYdb::NQuery::TQueryClient GetQueryClient(
+ NYdb::NQuery::TClientSettings settings = NYdb::NQuery::TClientSettings()) const
+ {
+ return NYdb::NQuery::TQueryClient(*Driver, settings);
}
bool IsUsingSnapshotReads() const {
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 8e7df95f899..c46f698bf92 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -4,6 +4,8 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+
namespace NKikimr {
namespace NKqp {
@@ -11,6 +13,38 @@ using namespace NYdb;
using namespace NYdb::NQuery;
Y_UNIT_TEST_SUITE(KqpQueryService) {
+ Y_UNIT_TEST(SessionFromPoolError) {
+ auto kikimr = DefaultKikimrRunner();
+ auto settings = NYdb::NQuery::TClientSettings().Database("WrongDB");
+ auto db = kikimr.GetQueryClient(settings);
+
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::NOT_FOUND, result.GetIssues().ToString());
+ }
+
+ Y_UNIT_TEST(SessionFromPoolSuccess) {
+ auto kikimr = DefaultKikimrRunner();
+ NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
+
+ {
+ auto db = kikimr.GetQueryClient();
+
+ TString id;
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetSession().GetId());
+ id = result.GetSession().GetId();
+ }
+ {
+ auto result = db.GetSession().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetSession().GetId(), id);
+ }
+ }
+ WaitForZeroSessions(counters);
+ }
+
Y_UNIT_TEST(StreamExecuteQueryPure) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
index 3f3b85ede05..40fa6b39f8c 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h
@@ -399,6 +399,7 @@ public:
TStreamRpc<TService, TRequest, TResponse, NGrpc::TStreamRequestReadProcessor> rpc,
TDbDriverStatePtr dbState,
const TRpcRequestSettings& requestSettings,
+ const TEndpointKey& preferredEndpoint = TEndpointKey(),
std::shared_ptr<IQueueClientContext> context = nullptr)
{
using NGrpc::TGrpcStatus;
@@ -489,7 +490,7 @@ public:
std::move(rpc),
std::move(meta),
context.get());
- }, dbState, TEndpointKey(), endpointPolicy);
+ }, dbState, preferredEndpoint, endpointPolicy);
}
template<class TService, class TRequest, class TResponse, class TCallback>
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
index 6a083f47509..617e897b182 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.cpp
@@ -4,7 +4,7 @@
namespace NYdb {
-static ui64 GetNodeIdFromSession(const TStringType& sessionId) {
+ui64 GetNodeIdFromSession(const TStringType& sessionId) {
if (sessionId.empty()) {
return 0;
}
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
index a932f7e54e3..46170b0ca9e 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h
@@ -12,6 +12,7 @@
namespace NYdb {
////////////////////////////////////////////////////////////////////////////////
+ui64 GetNodeIdFromSession(const TStringType& sessionId);
class TKqpSessionCommon : public TEndpointObj {
public:
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h
index ab6393261cf..49f26b28d0e 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h
@@ -11,7 +11,6 @@ class TKqpSessionCommon;
class ISessionClient {
public:
virtual ~ISessionClient() = default;
- virtual void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) = 0;
virtual void DeleteSession(TKqpSessionCommon* sessionImpl) = 0;
// TODO: Try to remove from ISessionClient
virtual bool ReturnSession(TKqpSessionCommon* sessionImpl) = 0;
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt
index f46a9677174..8cdf68c0bcb 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.darwin-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC
cpp-threading-future
api-protos
client-impl-ydb_endpoints
+ client-ydb_types-operation
public-issue-protos
)
target_sources(impl-ydb_internal-session_pool PRIVATE
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt
index 6248ba9c489..f399a8d31e8 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-aarch64.txt
@@ -15,6 +15,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC
cpp-threading-future
api-protos
client-impl-ydb_endpoints
+ client-ydb_types-operation
public-issue-protos
)
target_sources(impl-ydb_internal-session_pool PRIVATE
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt
index 6248ba9c489..f399a8d31e8 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC
cpp-threading-future
api-protos
client-impl-ydb_endpoints
+ client-ydb_types-operation
public-issue-protos
)
target_sources(impl-ydb_internal-session_pool PRIVATE
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt
index f46a9677174..8cdf68c0bcb 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/CMakeLists.windows-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(impl-ydb_internal-session_pool PUBLIC
cpp-threading-future
api-protos
client-impl-ydb_endpoints
+ client-ydb_types-operation
public-issue-protos
)
target_sources(impl-ydb_internal-session_pool PRIVATE
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
index 4b7310084bd..8dc87b46f59 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.cpp
@@ -4,11 +4,17 @@
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h>
#undef INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
+
+#include <util/random/random.h>
+
namespace NYdb {
namespace NSessionPool {
using namespace NThreading;
+constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus(
TPlainStatus(
EStatus::CLIENT_RESOURCE_EXHAUSTED,
@@ -16,6 +22,37 @@ static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus(
)
);
+TStatus GetStatus(const TOperation& operation) {
+ return operation.Status();
+}
+
+TStatus GetStatus(const TStatus& status) {
+ return status;
+}
+
+TDuration RandomizeThreshold(TDuration duration) {
+ TDuration::TValue value = duration.GetValue();
+ if (KEEP_ALIVE_RANDOM_FRACTION) {
+ const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
+ if (randomLimit < 2)
+ return duration;
+ value += static_cast<i64>(RandomNumber<ui64>(randomLimit));
+ }
+ return TDuration::FromValue(value);
+}
+
+bool IsSessionCloseRequested(const TStatus& status) {
+ const auto& meta = status.GetResponseMetadata();
+ auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS);
+ for(auto it = hints.first; it != hints.second; ++it) {
+ if (it->second == NYdb::YDB_SESSION_CLOSE) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout)
: MaxQueueSize_(maxQueueSize)
, MaxWaitSessionTimeout_(maxWaitSessionTimeout)
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h
index 561f5e119f9..7d9f2fc2b5b 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h
@@ -7,6 +7,7 @@
namespace NYdb {
+class TOperation;
namespace NSessionPool {
class IGetSessionCtx : private TNonCopyable {
@@ -18,10 +19,67 @@ public:
virtual void ReplyNewSession() = 0;
};
+//How often run session pool keep alive check
+constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5);
constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session
constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval
constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool
+TStatus GetStatus(const TOperation& operation);
+TStatus GetStatus(const TStatus& status);
+
+TDuration RandomizeThreshold(TDuration duration);
+bool IsSessionCloseRequested(const TStatus& status);
+
+template<typename TResponse>
+NThreading::TFuture<TResponse> InjectSessionStatusInterception(
+ std::shared_ptr<::NYdb::TKqpSessionCommon> impl, NThreading::TFuture<TResponse> asyncResponse,
+ bool updateTimeout,
+ TDuration timeout,
+ std::function<void(const TResponse&, TKqpSessionCommon&)> cb = {})
+{
+ auto promise = NThreading::NewPromise<TResponse>();
+ asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable {
+ Y_VERIFY(future.HasValue());
+
+ // TResponse can hold refcounted user provided data (TSession for example)
+ // and we do not want to have copy of it (for example it can cause delay dtor call)
+ // so using move semantic here is mandatory.
+ // Also we must reset captured shared pointer to session impl
+ TResponse value = std::move(future.ExtractValue());
+
+ const TStatus& status = GetStatus(value);
+ // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
+ // since we have guarantee this request wasn't been started to execute.
+
+ if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
+ impl->MarkBroken();
+ } else if (status.GetStatus() == EStatus::SESSION_BUSY) {
+ impl->MarkBroken();
+ } else if (status.GetStatus() == EStatus::BAD_SESSION) {
+ impl->MarkBroken();
+ } else if (IsSessionCloseRequested(status)) {
+ impl->MarkAsClosing();
+ } else {
+ // NOTE: About GetState and lock
+ // Simultanious call multiple requests on the same session make no sence, due to server limitation.
+ // But user can perform this call, right now we do not protect session from this, it may cause
+ // raise on session state if respoise is not success.
+ // It should not be a problem - in case of this race we close session
+ // or put it in to settler.
+ if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
+ impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE);
+ }
+ }
+ if (cb) {
+ cb(value, *impl);
+ }
+ impl.reset();
+ promise.SetValue(std::move(value));
+ });
+ return promise.GetFuture();
+}
+
class TSessionPool {
private:
class TWaitersQueue {
diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make
index 9e3c6beae85..0bb2529a8da 100644
--- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make
+++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/ya.make
@@ -8,6 +8,7 @@ PEERDIR(
library/cpp/threading/future
ydb/public/api/protos
ydb/public/sdk/cpp/client/impl/ydb_endpoints
+ ydb/public/sdk/cpp/client/ydb_types/operation
ydb/library/yql/public/issue/protos
)
diff --git a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
index 160fe391c82..914ec967116 100644
--- a/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
+++ b/ydb/public/sdk/cpp/client/resources/ydb_sdk_version.txt
@@ -1 +1 @@
-2.5.5 \ No newline at end of file
+2.6.0 \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index 253e14bc425..2ca4d633572 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -1,7 +1,11 @@
#include "client.h"
+#include "impl/client_session.h"
#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_endpoints/endpoints.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_client/session_client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h>
#undef INCLUDE_YDB_INTERNAL_H
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -10,11 +14,17 @@
namespace NYdb::NQuery {
-class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl> {
+TCreateSessionSettings::TCreateSessionSettings() {
+ ClientTimeout_ = TDuration::Seconds(5);
+};
+
+class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public ISessionClient {
+ friend class ::NYdb::NQuery::TSession;
public:
TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TClientSettings& settings)
: TClientImplCommon(std::move(connections), settings)
, Settings_(settings)
+ , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_)
{
}
@@ -23,15 +33,18 @@ public:
}
TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId = {})
{
- return TExecQueryImpl::StreamExecuteQuery(Connections_, DbDriverState_, query, txControl, params, settings);
+ return TExecQueryImpl::StreamExecuteQuery(
+ Connections_, DbDriverState_, query, txControl, params, settings, sessionId);
}
TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings,
+ const TString& sessionId = {})
{
- return TExecQueryImpl::ExecuteQuery(Connections_, DbDriverState_, query, txControl, params, settings);
+ return TExecQueryImpl::ExecuteQuery(
+ Connections_, DbDriverState_, query, txControl, params, settings, sessionId);
}
NThreading::TFuture<TScriptExecutionOperation> ExecuteScript(const TString& script, const TExecuteScriptSettings& settings) {
@@ -127,13 +140,222 @@ public:
return promise.GetFuture();
}
+ void DeleteSession(TKqpSessionCommon* sessionImpl) override {
+ //TODO: Remove this copy-paste
+
+ // Closing not owned by session pool session should not fire getting new session
+ if (sessionImpl->IsOwnedBySessionPool()) {
+ if (SessionPool_.CheckAndFeedWaiterNewSession(sessionImpl->NeedUpdateActiveCounter())) {
+ // We requested new session for waiter which already incremented
+ // active session counter and old session will be deleted
+ // - skip update active counter in this case
+ sessionImpl->SetNeedUpdateActiveCounter(false);
+ }
+ }
+
+ if (sessionImpl->NeedUpdateActiveCounter()) {
+ SessionPool_.DecrementActiveCounter();
+ }
+
+ delete sessionImpl;
+ }
+
+ bool ReturnSession(TKqpSessionCommon* sessionImpl) override {
+ Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_ACTIVE ||
+ sessionImpl->GetState() == TSession::TImpl::S_IDLE);
+
+ //TODO: Remove this copy-paste from table client
+ bool needUpdateCounter = sessionImpl->NeedUpdateActiveCounter();
+ // Also removes NeedUpdateActiveCounter flag
+ sessionImpl->MarkIdle();
+ sessionImpl->SetTimeInterval(TDuration::Zero());
+ if (!SessionPool_.ReturnSession(sessionImpl, needUpdateCounter)) {
+ sessionImpl->SetNeedUpdateActiveCounter(needUpdateCounter);
+ return false;
+ }
+ return true;
+ }
+
+ void DoAttachSession(Ydb::Query::CreateSessionResponse* resp,
+ NThreading::TPromise<TCreateSessionResult> promise, const TString& endpoint,
+ std::shared_ptr<TQueryClient::TImpl> client)
+ {
+ using TStreamProcessorPtr = TSession::TImpl::TStreamProcessorPtr;
+ Ydb::Query::AttachSessionRequest request;
+ const auto sessionId = resp->session_id();
+ request.set_session_id(sessionId);
+
+ const auto endpointKey = TEndpointKey(endpoint, GetNodeIdFromSession(sessionId));
+
+ auto args = std::make_shared<TSession::TImpl::TAttachSessionArgs>(promise, sessionId, endpoint, client);
+
+ // Do not pass client timeout here. Session must be alive
+ static const TRpcRequestSettings rpcSettings;
+
+ Connections_->StartReadStream<
+ Ydb::Query::V1::QueryService,
+ Ydb::Query::AttachSessionRequest,
+ Ydb::Query::SessionState>
+ (
+ std::move(request),
+ [args] (TPlainStatus status, TStreamProcessorPtr processor) mutable {
+ if (processor) {
+ TSession::TImpl::MakeImplAsync(processor, args);
+ } else {
+ TStatus st(std::move(status));
+ args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
+ }
+ },
+ &Ydb::Query::V1::QueryService::Stub::AsyncAttachSession,
+ DbDriverState_,
+ rpcSettings,
+ endpointKey);
+ }
+
+ TAsyncCreateSessionResult CreateAttachedSession(TDuration timeout) {
+ using namespace Ydb::Query;
+
+ Ydb::Query::CreateSessionRequest request;
+
+ auto promise = NThreading::NewPromise<TCreateSessionResult>();
+
+ auto self = shared_from_this();
+
+ auto extractor = [promise, self] (Ydb::Query::CreateSessionResponse* resp, TPlainStatus status) mutable {
+ if (resp) {
+ if (resp->status() != Ydb::StatusIds::SUCCESS) {
+ NYql::TIssues opIssues;
+ NYql::IssuesFromMessage(resp->issues(), opIssues);
+ TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
+ promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
+ } else {
+ self->DoAttachSession(resp, promise, status.Endpoint, self);
+ }
+ } else {
+ TStatus st(std::move(status));
+ promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
+ }
+ };
+
+ TRpcRequestSettings rpcSettings;
+ rpcSettings.ClientTimeout = timeout;
+
+ Connections_->Run<V1::QueryService, CreateSessionRequest, CreateSessionResponse>(
+ std::move(request),
+ extractor,
+ &V1::QueryService::Stub::AsyncCreateSession,
+ DbDriverState_,
+ rpcSettings,
+ TEndpointKey());
+
+ return promise.GetFuture();
+ }
+
+ TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings) {
+ using namespace NSessionPool;
+
+ class TQueryClientGetSessionCtx : public IGetSessionCtx {
+ public:
+ TQueryClientGetSessionCtx(std::shared_ptr<TQueryClient::TImpl> client, TDuration timeout)
+ : Promise(NThreading::NewPromise<TCreateSessionResult>())
+ , Client(client)
+ , ClientTimeout(timeout)
+ {}
+
+ TAsyncCreateSessionResult GetFuture() {
+ return Promise.GetFuture();
+ }
+
+ void ReplyError(TStatus status) override {
+ TSession session;
+ ScheduleReply(TCreateSessionResult(std::move(status), std::move(session)));
+ }
+
+ void ReplySessionToUser(TKqpSessionCommon* session) override {
+ TCreateSessionResult val(
+ TStatus(TPlainStatus()),
+ TSession(
+ Client,
+ static_cast<TSession::TImpl*>(session)
+ )
+ );
+
+ ScheduleReply(std::move(val));
+ }
+
+ void ReplyNewSession() override {
+ Client->CreateAttachedSession(ClientTimeout).Subscribe(
+ [promise{std::move(Promise)}](TAsyncCreateSessionResult future) mutable
+ {
+ promise.SetValue(future.ExtractValue());
+ });
+ }
+
+ private:
+ void ScheduleReply(TCreateSessionResult val) {
+ Promise.SetValue(std::move(val));
+ }
+ NThreading::TPromise<TCreateSessionResult> Promise;
+ std::shared_ptr<TQueryClient::TImpl> Client;
+ TDuration ClientTimeout;
+ };
+
+ auto ctx = std::make_unique<TQueryClientGetSessionCtx>(shared_from_this(), settings.ClientTimeout_);
+ auto future = ctx->GetFuture();
+ SessionPool_.GetSession(std::move(ctx));
+ return future;
+ }
+
+ i64 GetActiveSessionCount() const {
+ return SessionPool_.GetActiveSessions();
+ }
+
+ i64 GetActiveSessionsLimit() const {
+ return SessionPool_.GetActiveSessionsLimit();
+ }
+
+ i64 GetCurrentPoolSize() const {
+ return SessionPool_.GetCurrentPoolSize();
+ }
+
+ void StartPeriodicSessionPoolTask() {
+ // Session pool guarantees than client is alive during call callbacks
+ auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) {
+
+ const auto& sessionPoolSettings = Settings_.SessionPoolSettings_;
+ const auto spentTime = s->GetTimeToTouchFast() - s->GetTimeInPastFast();
+
+ if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) {
+ if (sessionsCount > sessionPoolSettings.MinPoolSize_) {
+ return true;
+ }
+ }
+
+ return false;
+ };
+
+ // No need to keep-alive
+ auto keepAliveCmd = [](TKqpSessionCommon*) {
+ };
+
+ std::weak_ptr<TQueryClient::TImpl> weak = shared_from_this();
+ Connections_->AddPeriodicTask(
+ SessionPool_.CreatePeriodicTask(
+ weak,
+ std::move(keepAliveCmd),
+ std::move(deletePredicate)
+ ), NSessionPool::PERIODIC_ACTION_INTERVAL);
+ }
+
private:
TClientSettings Settings_;
+ NSessionPool::TSessionPool SessionPool_;
};
TQueryClient::TQueryClient(const TDriver& driver, const TClientSettings& settings)
: Impl_(new TQueryClient::TImpl(CreateInternalInterface(driver), settings))
{
+ Impl_->StartPeriodicSessionPoolTask();
}
TAsyncExecuteQueryResult TQueryClient::ExecuteQuery(const TString& query, const TTxControl& txControl,
@@ -173,4 +395,87 @@ TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const NKikimr::N
return Impl_->FetchScriptResults(operationId, resultSetIndex, settings);
}
+TAsyncCreateSessionResult TQueryClient::GetSession(const TCreateSessionSettings& settings)
+{
+ return Impl_->GetSession(settings);
+}
+
+i64 TQueryClient::GetActiveSessionCount() const {
+ return Impl_->GetActiveSessionCount();
+}
+
+i64 TQueryClient::GetActiveSessionsLimit() const {
+ return Impl_->GetActiveSessionsLimit();
+}
+
+i64 TQueryClient::GetCurrentPoolSize() const {
+ return Impl_->GetCurrentPoolSize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCreateSessionResult::TCreateSessionResult(TStatus&& status, TSession&& session)
+ : TStatus(std::move(status))
+ , Session_(std::move(session))
+{}
+
+TSession TCreateSessionResult::GetSession() const {
+ CheckStatusOk("TCreateSessionResult::GetSession");
+ return Session_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TSession::TSession()
+{}
+
+TSession::TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* session)
+ : Client_(client)
+ , SessionImpl_(session, TKqpSessionCommon::GetSmartDeleter(client))
+{}
+
+const TString& TSession::GetId() const {
+ return SessionImpl_->GetId();
+}
+
+TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
+{
+ return NSessionPool::InjectSessionStatusInterception(
+ SessionImpl_,
+ Client_->ExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()),
+ true,
+ Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
+}
+
+TAsyncExecuteQueryResult TSession::ExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TParams& params, const TExecuteQuerySettings& settings)
+{
+ return NSessionPool::InjectSessionStatusInterception(
+ SessionImpl_,
+ Client_->ExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()),
+ true,
+ Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
+}
+
+TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings)
+{
+ return NSessionPool::InjectSessionStatusInterception(
+ SessionImpl_,
+ Client_->StreamExecuteQuery(query, txControl, {}, settings, SessionImpl_->GetId()),
+ true,
+ Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
+}
+
+TAsyncExecuteQueryIterator TSession::StreamExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TParams& params, const TExecuteQuerySettings& settings)
+{
+ return NSessionPool::InjectSessionStatusInterception(
+ SessionImpl_,
+ Client_->StreamExecuteQuery(query, txControl, params, settings, SessionImpl_->GetId()),
+ true,
+ Client_->Settings_.SessionPoolSettings_.CloseIdleThreshold_);
+}
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h
index 7e823825e27..88e73e6689a 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.h
@@ -15,15 +15,43 @@ namespace NYdb {
namespace NYdb::NQuery {
+struct TCreateSessionSettings : public TSimpleRequestSettings<TCreateSessionSettings> {
+ TCreateSessionSettings();
+};
+
+class TCreateSessionResult;
+using TAsyncCreateSessionResult = NThreading::TFuture<TCreateSessionResult>;
+
+struct TSessionPoolSettings {
+ using TSelf = TSessionPoolSettings;
+
+ // Max number of sessions client can get from session pool
+ FLUENT_SETTING_DEFAULT(ui32, MaxActiveSessions, 50);
+
+ // Max time session to be in idle state before closing
+ FLUENT_SETTING_DEFAULT(TDuration, CloseIdleThreshold, TDuration::Minutes(1));
+
+ // Min number of session in session pool.
+ // Sessions will not be closed by CloseIdleThreshold if the number of sessions less then this limit.
+ FLUENT_SETTING_DEFAULT(ui32, MinPoolSize, 10);
+};
+
struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
+ using TSessionPoolSettings = TSessionPoolSettings;
using TSelf = TClientSettings;
+ FLUENT_SETTING(TSessionPoolSettings, SessionPoolSettings);
};
// ! WARNING: Experimental API
// ! This API is currently in experimental state and is a subject for changes.
// ! No backward and/or forward compatibility guarantees are provided.
// ! DO NOT USE for production workloads.
+class TSession;
class TQueryClient {
+ friend class TSession;
+public:
+ using TSettings = TClientSettings;
+ using TSession = TSession;
public:
TQueryClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
@@ -45,9 +73,57 @@ public:
TAsyncFetchScriptResultsResult FetchScriptResults(const NKikimr::NOperationId::TOperationId& operationId, int64_t resultSetIndex,
const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
+ TAsyncCreateSessionResult GetSession(const TCreateSessionSettings& settings = TCreateSessionSettings());
+
+ //! Returns number of active sessions given via session pool
+ i64 GetActiveSessionCount() const;
+
+ //! Returns the maximum number of sessions in session pool
+ i64 GetActiveSessionsLimit() const;
+
+ //! Returns the size of session pool
+ i64 GetCurrentPoolSize() const;
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
};
+class TSession {
+ friend class TQueryClient;
+public:
+ const TString& GetId() const;
+
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+ TAsyncExecuteQueryResult ExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+ TAsyncExecuteQueryIterator StreamExecuteQuery(const TString& query, const TTxControl& txControl,
+ const TParams& params, const TExecuteQuerySettings& settings = TExecuteQuerySettings());
+
+
+ class TImpl;
+private:
+ TSession();
+ TSession(std::shared_ptr<TQueryClient::TImpl> client, TSession::TImpl* sessionImpl);
+
+ std::shared_ptr<TQueryClient::TImpl> Client_;
+ std::shared_ptr<TSession::TImpl> SessionImpl_;
+};
+
+class TCreateSessionResult: public TStatus {
+ friend class TSession::TImpl;
+public:
+ TCreateSessionResult(TStatus&& status, TSession&& session);
+ TSession GetSession() const;
+
+private:
+ TSession Session_;
+};
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt
index 89fba989351..5636e882602 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.darwin-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC
)
target_sources(client-ydb_query-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt
index 89e9a242f2d..09ad124d137 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-aarch64.txt
@@ -19,4 +19,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC
)
target_sources(client-ydb_query-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt
index 89e9a242f2d..09ad124d137 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.linux-x86_64.txt
@@ -19,4 +19,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC
)
target_sources(client-ydb_query-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt
index 89fba989351..5636e882602 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/CMakeLists.windows-x86_64.txt
@@ -18,4 +18,5 @@ target_link_libraries(client-ydb_query-impl PUBLIC
)
target_sources(client-ydb_query-impl PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
)
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
new file mode 100644
index 00000000000..daa45e45efb
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
@@ -0,0 +1,62 @@
+#include "client_session.h"
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+namespace NYdb::NQuery {
+
+TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const TString& endpoint)
+ : TKqpSessionCommon(sessionId, endpoint, true)
+ , StreamProcessor_(ptr)
+{
+ MarkActive();
+ SetNeedUpdateActiveCounter(true);
+}
+
+TSession::TImpl::~TImpl()
+{
+ StreamProcessor_->Cancel();
+}
+
+void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
+ std::shared_ptr<TAttachSessionArgs> args)
+{
+ auto resp = std::make_shared<Ydb::Query::SessionState>();
+ ptr->Read(resp.get(), [args, resp, ptr](NGrpc::TGrpcStatus grpcStatus) mutable {
+ if (grpcStatus.GRpcStatusCode != grpc::StatusCode::OK) {
+ TStatus st(TPlainStatus(grpcStatus, args->Endpoint));
+ args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
+
+ } else {
+ if (resp->status() == Ydb::StatusIds::SUCCESS) {
+ NYdb::TStatus st(TPlainStatus(grpcStatus, args->Endpoint));
+ TSession::TImpl::NewSmartShared(ptr, std::move(args), st);
+
+ } else {
+ NYql::TIssues opIssues;
+ NYql::IssuesFromMessage(resp->issues(), opIssues);
+ TStatus st(static_cast<EStatus>(resp->status()), std::move(opIssues));
+ args->Promise.SetValue(TCreateSessionResult(std::move(st), TSession()));
+ }
+ }
+ });
+}
+
+void TSession::TImpl::NewSmartShared(TStreamProcessorPtr ptr,
+ std::shared_ptr<TAttachSessionArgs> args, NYdb::TStatus st)
+{
+ args->Promise.SetValue(
+ TCreateSessionResult(
+ std::move(st),
+ TSession(
+ args->Client,
+ new TSession::TImpl(ptr, args->SessionId, args->Endpoint)
+ )
+ )
+ );
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h
new file mode 100644
index 00000000000..fd9d224291e
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/client_session.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
+
+namespace NYdb::NQuery {
+
+class TSession::TImpl : public TKqpSessionCommon {
+public:
+ struct TAttachSessionArgs {
+ TAttachSessionArgs(NThreading::TPromise<TCreateSessionResult> promise,
+ TString sessionId,
+ TString endpoint,
+ std::shared_ptr<TQueryClient::TImpl> client)
+ : Promise(promise)
+ , SessionId(sessionId)
+ , Endpoint(endpoint)
+ , Client(client)
+ { }
+ NThreading::TPromise<TCreateSessionResult> Promise;
+ TString SessionId;
+ TString Endpoint;
+ std::shared_ptr<TQueryClient::TImpl> Client;
+ };
+
+ using TResponse = Ydb::Query::SessionState;
+ using TStreamProcessorPtr = NGrpc::IStreamRequestReadProcessor<TResponse>::TPtr;
+ TImpl(TStreamProcessorPtr ptr, const TString& id, const TString& endpoint);
+ ~TImpl();
+
+ static void MakeImplAsync(TStreamProcessorPtr processor, std::shared_ptr<TAttachSessionArgs> args);
+
+private:
+ static void NewSmartShared(TStreamProcessorPtr ptr, std::shared_ptr<TAttachSessionArgs> args, NYdb::TStatus status);
+
+private:
+ TStreamProcessorPtr StreamProcessor_;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
index a511c0ef42a..6cf63446187 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
@@ -182,13 +182,16 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryImpl(
const std::shared_ptr<TGRpcConnectionsImpl>& connections, const TDbDriverStatePtr& driverState,
const TString& query, const TTxControl& txControl, const ::google::protobuf::Map<TString, Ydb::TypedValue>* params,
- const TExecuteQuerySettings& settings)
+ const TExecuteQuerySettings& settings, const TString& sessionId)
{
auto request = MakeRequest<Ydb::Query::ExecuteQueryRequest>();
request.set_exec_mode(::Ydb::Query::ExecMode(settings.ExecMode_));
request.set_stats_mode(::Ydb::Query::StatsMode(settings.StatsMode_));
request.mutable_query_content()->set_text(query);
request.mutable_query_content()->set_syntax(::Ydb::Query::Syntax(settings.Syntax_));
+ if (sessionId) {
+ request.set_session_id(sessionId);
+ }
if (settings.ConcurrentResultSets_) {
request.set_concurrent_result_sets(*settings.ConcurrentResultSets_);
@@ -232,7 +235,7 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId)
{
auto promise = NewPromise<TExecuteQueryIterator>();
@@ -251,19 +254,19 @@ TAsyncExecuteQueryIterator TExecQueryImpl::StreamExecuteQuery(const std::shared_
? &params->GetProtoMap()
: nullptr;
- StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings)
+ StreamExecuteQueryImpl(connections, driverState, query, txControl, paramsProto, settings, sessionId)
.Subscribe(iteratorCallback);
return promise.GetFuture();
}
TAsyncExecuteQueryResult TExecQueryImpl::ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings)
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId)
{
auto syncSettings = settings;
syncSettings.ConcurrentResultSets(true);
- return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings)
+ return StreamExecuteQuery(connections, driverState, query, txControl, params, syncSettings, sessionId)
.Apply([](TAsyncExecuteQueryIterator itFuture){
auto it = itFuture.ExtractValue();
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
index b7fd5b08728..24765225a70 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.h
@@ -13,11 +13,11 @@ class TExecQueryImpl {
public:
static TAsyncExecuteQueryIterator StreamExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings);
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId);
static TAsyncExecuteQueryResult ExecuteQuery(const std::shared_ptr<TGRpcConnectionsImpl>& connections,
const TDbDriverStatePtr& driverState, const TString& query, const TTxControl& txControl,
- const TMaybe<TParams>& params, const TExecuteQuerySettings& settings);
+ const TMaybe<TParams>& params, const TExecuteQuerySettings& settings, const TString& sessionId);
};
} // namespace NYdb::NQuery::NImpl
diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make
index 969bcdd7745..23bae5bf53f 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_query/impl/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
SRCS(
exec_query.cpp
exec_query.h
+ client_session.cpp
)
PEERDIR(
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
index d6a122bfec0..478f927f6a6 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.cpp
@@ -8,28 +8,6 @@ using namespace NThreading;
const TKeepAliveSettings TTableClient::TImpl::KeepAliveSettings = TKeepAliveSettings().ClientTimeout(KEEP_ALIVE_CLIENT_TIMEOUT);
-bool IsSessionCloseRequested(const TStatus& status) {
- const auto& meta = status.GetResponseMetadata();
- auto hints = meta.equal_range(NYdb::YDB_SERVER_HINTS);
- for(auto it = hints.first; it != hints.second; ++it) {
- if (it->second == NYdb::YDB_SESSION_CLOSE) {
- return true;
- }
- }
-
- return false;
-}
-
-TDuration RandomizeThreshold(TDuration duration) {
- TDuration::TValue value = duration.GetValue();
- if (KEEP_ALIVE_RANDOM_FRACTION) {
- const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION;
- if (randomLimit < 2)
- return duration;
- value += static_cast<i64>(RandomNumber<ui64>(randomLimit));
- }
- return TDuration::FromValue(value);
-}
TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings) {
return Min(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
@@ -39,14 +17,6 @@ TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings) {
return Max(settings.CloseIdleThreshold_, settings.KeepAliveIdleThreshold_);
}
-TStatus GetStatus(const TOperation& operation) {
- return operation.Status();
-}
-
-TStatus GetStatus(const TStatus& status) {
- return status;
-}
-
ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber) {
ui32 backoffSlots = 1 << std::min(retryNumber, settings.Ceiling_);
TDuration maxDuration = settings.SlotDuration_ * backoffSlots;
@@ -143,7 +113,6 @@ void TTableClient::TImpl::AsyncBackoff(const TBackoffSettings& settings, ui32 re
}
void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
-
// Session pool guarantees than client is alive during call callbacks
auto deletePredicate = [this](TKqpSessionCommon* s, size_t sessionsCount) {
@@ -205,7 +174,7 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
auto timeToNextTouch = calcTimeToNextTouch(spentTime);
session.SessionImpl_->ScheduleTimeToTouchFast(
- RandomizeThreshold(timeToNextTouch),
+ NSessionPool::RandomizeThreshold(timeToNextTouch),
spentTime >= maxTimeToTouch
);
};
@@ -216,7 +185,7 @@ void TTableClient::TImpl::StartPeriodicSessionPoolTask() {
weak,
std::move(keepAliveCmd),
std::move(deletePredicate)
- ), PERIODIC_ACTION_INTERVAL);
+ ), NSessionPool::PERIODIC_ACTION_INTERVAL);
}
ui64 TTableClient::TImpl::ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client) {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
index 3b73e782822..875c740a6b8 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/impl/table_client.h
@@ -13,8 +13,6 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
-#include <util/random/random.h>
-
#include "client_session.h"
#include "data_query.h"
#include "request_migrator.h"
@@ -24,73 +22,16 @@
namespace NYdb {
namespace NTable {
-//How often run session pool keep alive check
-constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5);
//How ofter run host scan to perform session balancing
constexpr TDuration HOSTSCAN_PERIODIC_ACTION_INTERVAL = TDuration::Seconds(2);
-constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4;
constexpr ui32 MAX_BACKOFF_DURATION_MS = TDuration::Hours(1).MilliSeconds();
constexpr TDuration KEEP_ALIVE_CLIENT_TIMEOUT = TDuration::Seconds(5);
TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings);
TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings);
ui32 CalcBackoffTime(const TBackoffSettings& settings, ui32 retryNumber);
-bool IsSessionCloseRequested(const TStatus& status);
-TDuration RandomizeThreshold(TDuration duration);
TDuration GetMinTimeToTouch(const TSessionPoolSettings& settings);
TDuration GetMaxTimeToTouch(const TSessionPoolSettings& settings);
-TStatus GetStatus(const TOperation& operation);
-TStatus GetStatus(const TStatus& status);
-
-template<typename TResponse>
-NThreading::TFuture<TResponse> InjectSessionStatusInterception(
- std::shared_ptr<TSession::TImpl>& impl, NThreading::TFuture<TResponse> asyncResponse,
- bool updateTimeout,
- TDuration timeout,
- std::function<void(const TResponse&, TSession::TImpl&)> cb = {})
-{
- auto promise = NThreading::NewPromise<TResponse>();
- asyncResponse.Subscribe([impl, promise, cb, updateTimeout, timeout](NThreading::TFuture<TResponse> future) mutable {
- Y_VERIFY(future.HasValue());
-
- // TResponse can hold refcounted user provided data (TSession for example)
- // and we do not want to have copy of it (for example it can cause delay dtor call)
- // so using move semantic here is mandatory.
- // Also we must reset captured shared pointer to session impl
- TResponse value = std::move(future.ExtractValue());
-
- const TStatus& status = GetStatus(value);
- // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
- // since we have guarantee this request wasn't been started to execute.
-
- if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
- impl->MarkBroken();
- } else if (status.GetStatus() == EStatus::SESSION_BUSY) {
- impl->MarkBroken();
- } else if (status.GetStatus() == EStatus::BAD_SESSION) {
- impl->MarkBroken();
- } else if (IsSessionCloseRequested(status)) {
- impl->MarkAsClosing();
- } else {
- // NOTE: About GetState and lock
- // Simultanious call multiple requests on the same session make no sence, due to server limitation.
- // But user can perform this call, right now we do not protect session from this, it may cause
- // raise on session state if respoise is not success.
- // It should not be a problem - in case of this race we close session
- // or put it in to settler.
- if (updateTimeout && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
- impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TSession::TImpl::EState::S_ACTIVE);
- }
- }
- if (cb) {
- cb(value, *impl);
- }
- impl.reset();
- promise.SetValue(std::move(value));
- });
- return promise.GetFuture();
-}
-
class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public ISessionClient {
public:
@@ -105,7 +46,7 @@ public:
NThreading::TFuture<void> Drain();
NThreading::TFuture<void> Stop();
void ScheduleTask(const std::function<void()>& fn, TDuration timeout);
- void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout) override;
+ void ScheduleTaskUnsafe(std::function<void()>&& fn, TDuration timeout);
void AsyncBackoff(const TBackoffSettings& settings, ui32 retryNumber, const std::function<void()>& fn);
void StartPeriodicSessionPoolTask();
static ui64 ScanForeignLocations(std::shared_ptr<TTableClient::TImpl> client);
@@ -143,7 +84,7 @@ public:
CacheMissCounter.Inc();
- return InjectSessionStatusInterception(session.SessionImpl_,
+ return ::NYdb::NSessionPool::InjectSessionStatusInterception(session.SessionImpl_,
ExecuteDataQueryInternal(session, query, txControl, params, settings, false),
true, GetMinTimeToTouch(Settings_.SessionPoolSettings_));
}
@@ -153,13 +94,13 @@ public:
TParamsType params, const TExecDataQuerySettings& settings,
bool fromCache) {
TString queryKey = dataQuery.Impl_->GetTextHash();
- auto cb = [queryKey](const TDataQueryResult& result, TSession::TImpl& session) {
+ auto cb = [queryKey](const TDataQueryResult& result, TKqpSessionCommon& session) {
if (result.GetStatus() == EStatus::NOT_FOUND) {
- session.InvalidateQueryInCache(queryKey);
+ static_cast<TSession::TImpl&>(session).InvalidateQueryInCache(queryKey);
}
};
- return InjectSessionStatusInterception<TDataQueryResult>(
+ return ::NYdb::NSessionPool::InjectSessionStatusInterception<TDataQueryResult>(
session.SessionImpl_,
session.Client_->ExecuteDataQueryInternal(session, dataQuery, txControl, params, settings, fromCache),
true,
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index de3e4ebed32..035ef07168c 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -31,6 +31,7 @@ namespace NYdb {
namespace NTable {
using namespace NThreading;
+using namespace NSessionPool;
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 38765041e6d..340c99b9767 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -930,6 +930,7 @@ struct TSessionPoolSettings {
struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
using TSelf = TClientSettings;
+ using TSessionPoolSettings = TSessionPoolSettings;
// Enable client query cache. Client query cache is used to map query text to
// prepared query id for ExecuteDataQuery calls on client side.
@@ -998,6 +999,8 @@ public:
using TOperationSyncFunc = std::function<TStatus(TSession session)>;
using TOperationWithoutSessionFunc = std::function<TAsyncStatus(TTableClient& tableClient)>;
using TOperationWithoutSessionSyncFunc = std::function<TStatus(TTableClient& tableClient)>;
+ using TSettings = TClientSettings;
+ using TSession = TSession;
public:
TTableClient(const TDriver& driver, const TClientSettings& settings = TClientSettings());
diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
index d4b82e3ab55..1ad11c74ea3 100644
--- a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
+++ b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
@@ -1,5 +1,6 @@
#include "ydb_common_ut.h"
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
@@ -204,7 +205,20 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
driver.Stop(true);
}
- Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) {
+ void EnsureCanExecQuery(NYdb::NTable::TSession session) {
+ auto execStatus = session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
+ UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
+ }
+
+ void EnsureCanExecQuery(NYdb::NQuery::TSession session) {
+ auto execStatus = session.ExecuteQuery("SELECT 42;",
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync().GetStatus();
+ UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
+ }
+
+ template<typename TClient>
+ void DoMultiThreadSessionPoolLimitSync() {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();
@@ -215,14 +229,14 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
.SetEndpoint(location));
const int maxActiveSessions = 45;
- NYdb::NTable::TTableClient client(driver,
- TClientSettings()
+ TClient client(driver,
+ typename TClient::TSettings()
.SessionPoolSettings(
- TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+ typename TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
constexpr int nThreads = 100;
NYdb::EStatus statuses[nThreads];
- TVector<TMaybe<NYdb::NTable::TSession>> sessions;
+ TVector<TMaybe<typename TClient::TSession>> sessions;
sessions.resize(nThreads);
TAtomic t = 0;
auto job = [client, &t, &statuses, &sessions]() mutable {
@@ -230,9 +244,7 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
int i = AtomicIncrement(t);
statuses[--i] = sessionResponse.GetStatus();
if (statuses[i] == EStatus::SUCCESS) {
- auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
- UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
+ EnsureCanExecQuery(sessionResponse.GetSession());
sessions[i] = sessionResponse.GetSession();
}
};
@@ -270,7 +282,26 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
driver.Stop(true);
}
- Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) {
+ Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncTableClient) {
+ DoMultiThreadSessionPoolLimitSync<NYdb::NTable::TTableClient>();
+ }
+
+ Y_UNIT_TEST(MultiThreadSessionPoolLimitSyncQueryClient) {
+ DoMultiThreadSessionPoolLimitSync<NYdb::NQuery::TQueryClient>();
+ }
+
+ NYdb::NTable::TAsyncDataQueryResult ExecQueryAsync(NYdb::NTable::TSession session, const TString q) {
+ return session.ExecuteDataQuery(q,
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx());
+ }
+
+ NYdb::NQuery::TAsyncExecuteQueryResult ExecQueryAsync(NYdb::NQuery::TSession session, const TString q) {
+ return session.ExecuteQuery(q,
+ NYdb::NQuery::TTxControl::BeginTx().CommitTx());
+ }
+
+ template<typename T>
+ void DoMultiThreadMultipleRequestsOnSharedSessions() {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();
@@ -281,21 +312,21 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
.SetEndpoint(location));
const int maxActiveSessions = 10;
- NYdb::NTable::TTableClient client(driver,
- TClientSettings()
+ typename T::TClient client(driver,
+ typename T::TClient::TSettings()
.SessionPoolSettings(
- TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+ typename T::TClient::TSettings::TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
constexpr int nThreads = 20;
constexpr int nRequests = 50;
- std::array<TVector<TAsyncDataQueryResult>, nThreads> results;
+ std::array<TVector<typename T::TResult>, nThreads> results;
TAtomic t = 0;
TAtomic validSessions = 0;
auto job = [client, &t, &results, &validSessions]() mutable {
auto sessionResponse = client.GetSession().ExtractValueSync();
int i = AtomicIncrement(t);
- TVector<TAsyncDataQueryResult>& r = results[--i];
+ TVector<typename T::TResult>& r = results[--i];
if (sessionResponse.GetStatus() != EStatus::SUCCESS) {
return;
@@ -303,8 +334,7 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
AtomicIncrement(validSessions);
for (int i = 0; i < nRequests; i++) {
- r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ r.push_back(ExecQueryAsync(sessionResponse.GetSession(), "SELECT 42;"));
}
};
IThreadFactory* pool = SystemThreadFactory();
@@ -354,6 +384,23 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
driver.Stop(true);
}
+ Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsTableClient) {
+ struct TTypeHelper {
+ using TClient = NYdb::NTable::TTableClient;
+ using TResult = NYdb::NTable::TAsyncDataQueryResult;
+ };
+ DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
+ }
+
+ // Enable after interactive tx support
+ //Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessionsQueryClient) {
+ // struct TTypeHelper {
+ // using TClient = NYdb::NQuery::TQueryClient;
+ // using TResult = NYdb::NQuery::TAsyncExecuteQueryResult;
+ // };
+ // DoMultiThreadMultipleRequestsOnSharedSessions<TTypeHelper>();
+ //}
+
Y_UNIT_TEST(SessionsServerLimit) {
NKikimrConfig::TAppConfig appConfig;
auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();