aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-03-01 18:24:07 +0300
committerdcherednik <dcherednik@ydb.tech>2023-03-01 18:24:07 +0300
commit73d7c9861df84ecfff5cb0b23ec96e41866fc4c4 (patch)
tree77d8ccad837fb93b0652ffc5726cd64a5b9d465f
parent257d958d39fcc443f08920bfe8c1c87b760e1716 (diff)
downloadydb-73d7c9861df84ecfff5cb0b23ec96e41866fc4c4.tar.gz
Move sdk sessions tests in to separate directory to reduce timeout impact.
Otherwise there is small possibility starting session keep alive tasks during the run.
-rw-r--r--ydb/services/ydb/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ydb/CMakeLists.linux.txt1
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt80
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt82
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt84
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt15
-rw-r--r--ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp641
-rw-r--r--ydb/services/ydb/ydb_table_ut.cpp626
9 files changed, 905 insertions, 626 deletions
diff --git a/ydb/services/ydb/CMakeLists.darwin.txt b/ydb/services/ydb/CMakeLists.darwin.txt
index 58fbeb4c3c..813d577ed8 100644
--- a/ydb/services/ydb/CMakeLists.darwin.txt
+++ b/ydb/services/ydb/CMakeLists.darwin.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
index 788ede8cdf..2f77c61e3d 100644
--- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/CMakeLists.linux.txt b/ydb/services/ydb/CMakeLists.linux.txt
index 788ede8cdf..2f77c61e3d 100644
--- a/ydb/services/ydb/CMakeLists.linux.txt
+++ b/ydb/services/ydb/CMakeLists.linux.txt
@@ -9,6 +9,7 @@
find_package(OpenSSL REQUIRED)
add_subdirectory(index_ut)
add_subdirectory(sdk_credprovider_ut)
+add_subdirectory(sdk_sessions_ut)
add_subdirectory(table_split_ut)
add_subdirectory(ut)
diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..a55fad0b0d
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt
@@ -0,0 +1,80 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+vcs_info(ydb-services-ydb-sdk_sessions_ut)
diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..05e4326c4a
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+vcs_info(ydb-services-ydb-sdk_sessions_ut)
diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt
new file mode 100644
index 0000000000..632434c626
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt
@@ -0,0 +1,84 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ydb-sdk_sessions_ut)
+target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb
+)
+target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ydb
+ cpp-grpc-client
+ core-testlib-default
+ ydb-core-testlib
+ cpp-client-ydb_table
+)
+target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_TARGET
+ ydb-services-ydb-sdk_sessions_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ydb-sdk_sessions_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+vcs_info(ydb-services-ydb-sdk_sessions_ut)
diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
new file mode 100644
index 0000000000..d4b82e3ab5
--- /dev/null
+++ b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
@@ -0,0 +1,641 @@
+#include "ydb_common_ut.h"
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+Y_UNIT_TEST_SUITE(YdbSdkSessions) {
+ Y_UNIT_TEST(TestSessionPool) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ const TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ THashSet<TString> sids;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
+ auto session = sessionResponse.GetSession();
+ sids.insert(session.GetId());
+ auto result = session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location);
+ }
+ // All requests used one session
+ UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1);
+ // No more session captured by client
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(TestMultipleSessions) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ TVector<TSession> sids;
+ TVector<TAsyncDataQueryResult> results;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
+ auto session = sessionResponse.GetSession();
+ sids.push_back(session);
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ }
+
+ NThreading::WaitExceptionOrAll(results).Wait();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
+
+ for (auto& result : results) {
+ UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS);
+ }
+ sids.clear();
+ results.clear();
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 10;
+
+ TVector<TSession> sids;
+ TVector<TAsyncDataQueryResult> results;
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
+ auto session = sessionResponse.GetSession();
+ sids.push_back(session);
+ if (count == 0) {
+ // Force BAD session server response for ExecuteDataQuery
+ UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS);
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ } else {
+ results.push_back(session.ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ }
+ }
+
+ NThreading::WaitExceptionOrAll(results).Wait();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
+
+ for (size_t i = 0; i < results.size(); i++) {
+ if (i == 9) {
+ UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION);
+ } else {
+ UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS);
+ }
+ }
+ sids.clear();
+ results.clear();
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ int count = 100;
+
+ {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT(sessionResponse.IsSuccess());
+ auto session = sessionResponse.GetSession();
+ auto result = session.ExecuteSchemeQuery(R"___(
+ CREATE TABLE `Root/Test` (
+ Key Uint32,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )___").ExtractValueSync();
+ UNIT_ASSERT(result.IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ }
+
+ while (count--) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
+ auto session = sessionResponse.GetSession();
+
+ // Assume 10us is too small to execute query and get response
+ auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
+ NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(MultiThreadSync) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ NYdb::NTable::TTableClient client(driver);
+ const int nThreads = 10;
+ const int nRequests = 1000;
+ auto job = [client]() mutable {
+ for (int i = 0; i < nRequests; i++) {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS);
+ }
+ };
+ IThreadFactory* pool = SystemThreadFactory();
+
+ TVector<TAutoPtr<IThreadFactory::IThread>> threads;
+ threads.resize(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ threads[i] = pool->Run(job);
+ }
+ for (int i = 0; i < nThreads; i++) {
+ threads[i]->Join();
+ }
+ UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ const int maxActiveSessions = 45;
+ NYdb::NTable::TTableClient client(driver,
+ TClientSettings()
+ .SessionPoolSettings(
+ TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+
+ constexpr int nThreads = 100;
+ NYdb::EStatus statuses[nThreads];
+ TVector<TMaybe<NYdb::NTable::TSession>> sessions;
+ sessions.resize(nThreads);
+ TAtomic t = 0;
+ auto job = [client, &t, &statuses, &sessions]() mutable {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+ int i = AtomicIncrement(t);
+ statuses[--i] = sessionResponse.GetStatus();
+ if (statuses[i] == EStatus::SUCCESS) {
+ auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
+ UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
+ sessions[i] = sessionResponse.GetSession();
+ }
+ };
+ IThreadFactory* pool = SystemThreadFactory();
+
+ TVector<TAutoPtr<IThreadFactory::IThread>> threads;
+ threads.resize(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ threads[i] = pool->Run(job);
+ }
+ for (int i = 0; i < nThreads; i++) {
+ threads[i]->Join();
+ }
+
+ sessions.clear();
+
+ int successCount = 0;
+ int exhaustedCount = 0;
+ for (int i = 0; i < nThreads; i++) {
+ switch (statuses[i]) {
+ case EStatus::SUCCESS:
+ successCount++;
+ break;
+ case EStatus::CLIENT_RESOURCE_EXHAUSTED:
+ exhaustedCount++;
+ break;
+ default:
+ UNIT_ASSERT(false);
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions);
+ UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions);
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) {
+ TKikimrWithGrpcAndRootSchema server;
+ ui16 grpc = server.GetPort();
+
+ TString location = TStringBuilder() << "localhost:" << grpc;
+
+ auto driver = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(location));
+
+ const int maxActiveSessions = 10;
+ NYdb::NTable::TTableClient client(driver,
+ TClientSettings()
+ .SessionPoolSettings(
+ TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
+
+ constexpr int nThreads = 20;
+ constexpr int nRequests = 50;
+ std::array<TVector<TAsyncDataQueryResult>, nThreads> results;
+ TAtomic t = 0;
+ TAtomic validSessions = 0;
+ auto job = [client, &t, &results, &validSessions]() mutable {
+ auto sessionResponse = client.GetSession().ExtractValueSync();
+
+ int i = AtomicIncrement(t);
+ TVector<TAsyncDataQueryResult>& r = results[--i];
+
+ if (sessionResponse.GetStatus() != EStatus::SUCCESS) {
+ return;
+ }
+ AtomicIncrement(validSessions);
+
+ for (int i = 0; i < nRequests; i++) {
+ r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
+ TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
+ }
+ };
+ IThreadFactory* pool = SystemThreadFactory();
+
+ TVector<TAutoPtr<IThreadFactory::IThread>> threads;
+ threads.resize(nThreads);
+ for (int i = 0; i < nThreads; i++) {
+ threads[i] = pool->Run(job);
+ }
+ for (int i = 0; i < nThreads; i++) {
+ threads[i]->Join();
+ }
+
+ for (auto& r : results) {
+ NThreading::WaitExceptionOrAll(r).Wait();
+ }
+ for (auto& r : results) {
+ if (!r.empty()) {
+ int ok = 0;
+ int bad = 0;
+ for (auto& asyncStatus : r) {
+ auto res = asyncStatus.GetValue();
+ if (res.IsSuccess()) {
+ ok++;
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY);
+ bad++;
+ }
+ }
+ //UNIT_ASSERT_VALUES_EQUAL(ok, 1);
+ //UNIT_ASSERT_VALUES_EQUAL(bad, nRequests - 1);
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions);
+ auto curExpectedActive = maxActiveSessions;
+ auto empty = 0;
+ for (auto& r : results) {
+ if (!r.empty()) {
+ r.clear();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive);
+ } else {
+ empty++;
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ driver.Stop(true);
+ }
+
+ Y_UNIT_TEST(SessionsServerLimit) {
+ NKikimrConfig::TAppConfig appConfig;
+ auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
+ tableServiceConfig.SetSessionsLimitPerNode(2);
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session2 = sessionResult.GetSession();
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ sessionResult.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
+
+ auto status = session1.Close().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
+
+ auto result = session2.ExecuteDataQuery(R"___(
+ SELECT 1;
+ )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult = client.CreateSession().ExtractValueSync();
+ sessionResult.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ }
+
+ Y_UNIT_TEST(SessionsServerLimitWithSessionPool) {
+ NKikimrConfig::TAppConfig appConfig;
+ auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
+ tableServiceConfig.SetSessionsLimitPerNode(2);
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult1 = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ auto session1 = sessionResult1.GetSession();
+
+ auto sessionResult2 = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+ auto session2 = sessionResult2.GetSession();
+
+ {
+ auto sessionResult3 = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+
+ auto status = session1.Close().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
+
+ // Close doesnt free session from user perspective,
+ // the value of ActiveSessionsCounter will be same after Close() call.
+ // Probably we want to chenge this contract
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+
+ auto result = session2.ExecuteDataQuery(R"___(
+ SELECT 1;
+ )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ sessionResult1 = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
+
+ auto sessionResult3 = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
+
+ auto tmp = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5);
+ sessionResult1 = tmp; // here we reset previous created session object,
+ // so perform close rpc call implicitly and delete it
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED);
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
+ }
+
+ Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) {
+ NKikimrConfig::TAppConfig appConfig;
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ TVector<TString> sessionIds;
+ int iterations = 50;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ UNIT_ASSERT(status.ok());
+ auto deferred = response.operation();
+ UNIT_ASSERT(deferred.ready() == true);
+ UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION);
+ }
+ }
+
+ Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) {
+ NKikimrConfig::TAppConfig appConfig;
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ TVector<TString> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+
+ if (RandomNumber<ui32>(10) == 5) {
+ client.Stop().Apply([client](NThreading::TFuture<void> future){
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
+ return future;
+ }).Wait();
+ } else {
+ client.Stop().Wait();
+ }
+
+ if (iterations & 4) {
+ driver.Stop(true);
+ }
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ UNIT_ASSERT(status.ok());
+ auto deferred = response.operation();
+ UNIT_ASSERT(deferred.ready() == true);
+ UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
+ }
+ }
+
+ Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) {
+ NKikimrConfig::TAppConfig appConfig;
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ TVector<TString> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+ driver.Stop(true);
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ UNIT_ASSERT(status.ok());
+ auto deferred = response.operation();
+ UNIT_ASSERT(deferred.ready() == true);
+ UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
+ }
+ }
+
+ Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) {
+ NKikimrConfig::TAppConfig appConfig;
+
+ TKikimrWithGrpcAndRootSchema server(appConfig);
+
+ TVector<TString> sessionIds;
+ int iterations = 100;
+
+ while (iterations--) {
+ NYdb::TDriver driver(TDriverConfig()
+ .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+ NYdb::NTable::TTableClient client(driver);
+ //TODO: remove this scope after session tracker implementation
+ {
+ auto sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session1 = sessionResult.GetSession();
+ sessionIds.push_back(session1.GetId());
+
+ sessionResult = client.GetSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ // Here previous created session will be returnet to session pool
+ session1 = sessionResult.GetSession();
+ UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
+ sessionIds.push_back(session1.GetId());
+ }
+ }
+
+ std::shared_ptr<grpc::Channel> channel;
+ channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
+ auto stub = Ydb::Table::V1::TableService::NewStub(channel);
+ for (const auto& sessionId : sessionIds) {
+ grpc::ClientContext context;
+ Ydb::Table::KeepAliveRequest request;
+ request.set_session_id(sessionId);
+ Ydb::Table::KeepAliveResponse response;
+ auto status = stub->KeepAlive(&context, request, &response);
+ UNIT_ASSERT(status.ok());
+ auto deferred = response.operation();
+ UNIT_ASSERT(deferred.ready() == true);
+ UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
+ }
+ }
+}
diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp
index 2aa8cf08b6..abdf23ece7 100644
--- a/ydb/services/ydb/ydb_table_ut.cpp
+++ b/ydb/services/ydb/ydb_table_ut.cpp
@@ -239,347 +239,6 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) {
UNIT_ASSERT_EQUAL(status.GetStatus(), EStatus::SUCCESS);
}
- Y_UNIT_TEST(TestSessionPool) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- const TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- THashSet<TString> sids;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.insert(session.GetId());
- auto result = session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
-
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location);
- }
- // All requests used one session
- UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1);
- // No more session captured by client
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- Y_UNIT_TEST(TestMultipleSessions) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- TVector<TSession> sids;
- TVector<TAsyncDataQueryResult> results;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.push_back(session);
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- }
-
- NThreading::WaitExceptionOrAll(results).Wait();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
-
- for (auto& result : results) {
- UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS);
- }
- sids.clear();
- results.clear();
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 10;
-
- TVector<TSession> sids;
- TVector<TAsyncDataQueryResult> results;
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
- sids.push_back(session);
- if (count == 0) {
- // Force BAD session server response for ExecuteDataQuery
- UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS);
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- } else {
- results.push_back(session.ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- }
- }
-
- NThreading::WaitExceptionOrAll(results).Wait();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10);
-
- for (size_t i = 0; i < results.size(); i++) {
- if (i == 9) {
- UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION);
- } else {
- UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS);
- }
- }
- sids.clear();
- results.clear();
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- int count = 100;
-
- {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT(sessionResponse.IsSuccess());
- auto session = sessionResponse.GetSession();
- auto result = session.ExecuteSchemeQuery(R"___(
- CREATE TABLE `Root/Test` (
- Key Uint32,
- Value String,
- PRIMARY KEY (Key)
- );
- )___").ExtractValueSync();
- UNIT_ASSERT(result.IsSuccess());
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- }
-
- while (count--) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false);
- auto session = sessionResponse.GetSession();
-
- // Assume 10us is too small to execute query and get response
- auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
- NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadSync) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- NYdb::NTable::TTableClient client(driver);
- const int nThreads = 10;
- const int nRequests = 1000;
- auto job = [client]() mutable {
- for (int i = 0; i < nRequests; i++) {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS);
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
- UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- const int maxActiveSessions = 45;
- NYdb::NTable::TTableClient client(driver,
- TClientSettings()
- .SessionPoolSettings(
- TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
-
- constexpr int nThreads = 100;
- NYdb::EStatus statuses[nThreads];
- TVector<TMaybe<NYdb::NTable::TSession>> sessions;
- sessions.resize(nThreads);
- TAtomic t = 0;
- auto job = [client, &t, &statuses, &sessions]() mutable {
- auto sessionResponse = client.GetSession().ExtractValueSync();
- int i = AtomicIncrement(t);
- statuses[--i] = sessionResponse.GetStatus();
- if (statuses[i] == EStatus::SUCCESS) {
- auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus();
- UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS);
- sessions[i] = sessionResponse.GetSession();
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
-
- sessions.clear();
-
- int successCount = 0;
- int exhaustedCount = 0;
- for (int i = 0; i < nThreads; i++) {
- switch (statuses[i]) {
- case EStatus::SUCCESS:
- successCount++;
- break;
- case EStatus::CLIENT_RESOURCE_EXHAUSTED:
- exhaustedCount++;
- break;
- default:
- UNIT_ASSERT(false);
- }
- }
-
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions);
- UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions);
- driver.Stop(true);
- }
-
- Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) {
- TKikimrWithGrpcAndRootSchema server;
- ui16 grpc = server.GetPort();
-
- TString location = TStringBuilder() << "localhost:" << grpc;
-
- auto driver = NYdb::TDriver(
- TDriverConfig()
- .SetEndpoint(location));
-
- const int maxActiveSessions = 10;
- NYdb::NTable::TTableClient client(driver,
- TClientSettings()
- .SessionPoolSettings(
- TSessionPoolSettings().MaxActiveSessions(maxActiveSessions)));
-
- constexpr int nThreads = 20;
- constexpr int nRequests = 50;
- std::array<TVector<TAsyncDataQueryResult>, nThreads> results;
- TAtomic t = 0;
- TAtomic validSessions = 0;
- auto job = [client, &t, &results, &validSessions]() mutable {
- auto sessionResponse = client.GetSession().ExtractValueSync();
-
- int i = AtomicIncrement(t);
- TVector<TAsyncDataQueryResult>& r = results[--i];
-
- if (sessionResponse.GetStatus() != EStatus::SUCCESS) {
- return;
- }
- AtomicIncrement(validSessions);
-
- for (int i = 0; i < nRequests; i++) {
- r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;",
- TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()));
- }
- };
- IThreadFactory* pool = SystemThreadFactory();
-
- TVector<TAutoPtr<IThreadFactory::IThread>> threads;
- threads.resize(nThreads);
- for (int i = 0; i < nThreads; i++) {
- threads[i] = pool->Run(job);
- }
- for (int i = 0; i < nThreads; i++) {
- threads[i]->Join();
- }
-
- for (auto& r : results) {
- NThreading::WaitExceptionOrAll(r).Wait();
- }
- for (auto& r : results) {
- if (!r.empty()) {
- int ok = 0;
- int bad = 0;
- for (auto& asyncStatus : r) {
- auto res = asyncStatus.GetValue();
- if (res.IsSuccess()) {
- ok++;
- } else {
- UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY);
- bad++;
- }
- }
- //UNIT_ASSERT_VALUES_EQUAL(ok, 1);
- //UNIT_ASSERT_VALUES_EQUAL(bad, nRequests - 1);
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions);
- auto curExpectedActive = maxActiveSessions;
- auto empty = 0;
- for (auto& r : results) {
- if (!r.empty()) {
- r.clear();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive);
- } else {
- empty++;
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- driver.Stop(true);
- }
-
Y_UNIT_TEST(TestColumnOrder) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();
@@ -2287,291 +1946,6 @@ R"___(<main>: Error: Transaction not found: , code: 2015
UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
}
- Y_UNIT_TEST(SessionsServerLimit) {
- NKikimrConfig::TAppConfig appConfig;
- auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
- tableServiceConfig.SetSessionsLimitPerNode(2);
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
-
- sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session2 = sessionResult.GetSession();
-
- sessionResult = client.CreateSession().ExtractValueSync();
- sessionResult.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
-
- auto status = session1.Close().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
-
- auto result = session2.ExecuteDataQuery(R"___(
- SELECT 1;
- )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
-
- sessionResult = client.CreateSession().ExtractValueSync();
- sessionResult.GetIssues().PrintTo(Cerr);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- }
-
- Y_UNIT_TEST(SessionsServerLimitWithSessionPool) {
- NKikimrConfig::TAppConfig appConfig;
- auto& tableServiceConfig = *appConfig.MutableTableServiceConfig();
- tableServiceConfig.SetSessionsLimitPerNode(2);
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult1 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- auto session1 = sessionResult1.GetSession();
-
- auto sessionResult2 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- auto session2 = sessionResult2.GetSession();
-
- {
- auto sessionResult3 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
- }
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
-
- auto status = session1.Close().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false);
- UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS);
-
- // Close doesnt free session from user perspective,
- // the value of ActiveSessionsCounter will be same after Close() call.
- // Probably we want to chenge this contract
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
-
- auto result = session2.ExecuteDataQuery(R"___(
- SELECT 1;
- )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
-
- sessionResult1 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3);
-
- auto sessionResult3 = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
-
- auto tmp = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5);
- sessionResult1 = tmp; // here we reset previous created session object,
- // so perform close rpc call implicitly and delete it
- UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED);
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4);
- }
-
- Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- TVector<TString> sessionIds;
- int iterations = 50;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- auto sessionResult = client.CreateSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(sessionId);
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- TVector<TString> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
-
- if (RandomNumber<ui32>(10) == 5) {
- client.Stop().Apply([client](NThreading::TFuture<void> future){
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0);
- return future;
- }).Wait();
- } else {
- client.Stop().Wait();
- }
-
- if (iterations & 4) {
- driver.Stop(true);
- }
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(sessionId);
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- TVector<TString> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
- driver.Stop(true);
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(sessionId);
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-
- Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) {
- NKikimrConfig::TAppConfig appConfig;
-
- TKikimrWithGrpcAndRootSchema server(appConfig);
-
- TVector<TString> sessionIds;
- int iterations = 100;
-
- while (iterations--) {
- NYdb::TDriver driver(TDriverConfig()
- .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
- NYdb::NTable::TTableClient client(driver);
- //TODO: remove this scope after session tracker implementation
- {
- auto sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- auto session1 = sessionResult.GetSession();
- sessionIds.push_back(session1.GetId());
-
- sessionResult = client.GetSession().ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
- // Here previous created session will be returnet to session pool
- session1 = sessionResult.GetSession();
- UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
- sessionIds.push_back(session1.GetId());
- }
- }
-
- std::shared_ptr<grpc::Channel> channel;
- channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials());
- auto stub = Ydb::Table::V1::TableService::NewStub(channel);
- for (const auto& sessionId : sessionIds) {
- grpc::ClientContext context;
- Ydb::Table::KeepAliveRequest request;
- request.set_session_id(sessionId);
- Ydb::Table::KeepAliveResponse response;
- auto status = stub->KeepAlive(&context, request, &response);
- UNIT_ASSERT(status.ok());
- auto deferred = response.operation();
- UNIT_ASSERT(deferred.ready() == true);
- UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION);
- }
- }
-
Y_UNIT_TEST(DeleteTableWithDeletedIndex) {
TKikimrWithGrpcAndRootSchema server;