diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-03-01 18:24:07 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-03-01 18:24:07 +0300 |
commit | 73d7c9861df84ecfff5cb0b23ec96e41866fc4c4 (patch) | |
tree | 77d8ccad837fb93b0652ffc5726cd64a5b9d465f | |
parent | 257d958d39fcc443f08920bfe8c1c87b760e1716 (diff) | |
download | ydb-73d7c9861df84ecfff5cb0b23ec96e41866fc4c4.tar.gz |
Move sdk sessions tests in to separate directory to reduce timeout impact.
Otherwise there is small possibility starting session keep alive tasks during the run.
-rw-r--r-- | ydb/services/ydb/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/services/ydb/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/services/ydb/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt | 80 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt | 82 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt | 84 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt | 15 | ||||
-rw-r--r-- | ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp | 641 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_table_ut.cpp | 626 |
9 files changed, 905 insertions, 626 deletions
diff --git a/ydb/services/ydb/CMakeLists.darwin.txt b/ydb/services/ydb/CMakeLists.darwin.txt index 58fbeb4c3c..813d577ed8 100644 --- a/ydb/services/ydb/CMakeLists.darwin.txt +++ b/ydb/services/ydb/CMakeLists.darwin.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/CMakeLists.linux-aarch64.txt index 788ede8cdf..2f77c61e3d 100644 --- a/ydb/services/ydb/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ydb/CMakeLists.linux-aarch64.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/CMakeLists.linux.txt b/ydb/services/ydb/CMakeLists.linux.txt index 788ede8cdf..2f77c61e3d 100644 --- a/ydb/services/ydb/CMakeLists.linux.txt +++ b/ydb/services/ydb/CMakeLists.linux.txt @@ -9,6 +9,7 @@ find_package(OpenSSL REQUIRED) add_subdirectory(index_ut) add_subdirectory(sdk_credprovider_ut) +add_subdirectory(sdk_sessions_ut) add_subdirectory(table_split_ut) add_subdirectory(ut) diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..a55fad0b0d --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.darwin.txt @@ -0,0 +1,80 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + TIMEOUT + 300 +) +vcs_info(ydb-services-ydb-sdk_sessions_ut) diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..05e4326c4a --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + TIMEOUT + 300 +) +vcs_info(ydb-services-ydb-sdk_sessions_ut) diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..632434c626 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.linux.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ydb-sdk_sessions_ut) +target_compile_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb +) +target_link_libraries(ydb-services-ydb-sdk_sessions_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ydb + cpp-grpc-client + core-testlib-default + ydb-core-testlib + cpp-client-ydb_table +) +target_link_options(ydb-services-ydb-sdk_sessions_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ydb-sdk_sessions_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +) +set_property( + TARGET + ydb-services-ydb-sdk_sessions_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-ydb-sdk_sessions_ut + TEST_TARGET + ydb-services-ydb-sdk_sessions_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ydb-sdk_sessions_ut + PROPERTY + TIMEOUT + 300 +) +vcs_info(ydb-services-ydb-sdk_sessions_ut) diff --git a/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_ut/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp new file mode 100644 index 0000000000..d4b82e3ab5 --- /dev/null +++ b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp @@ -0,0 +1,641 @@ +#include "ydb_common_ut.h" +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h> + +using namespace NYdb; +using namespace NYdb::NTable; + +Y_UNIT_TEST_SUITE(YdbSdkSessions) { + Y_UNIT_TEST(TestSessionPool) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + const TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + THashSet<TString> sids; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); + auto session = sessionResponse.GetSession(); + sids.insert(session.GetId()); + auto result = session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location); + } + // All requests used one session + UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1); + // No more session captured by client + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + + driver.Stop(true); + } + + Y_UNIT_TEST(TestMultipleSessions) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + TVector<TSession> sids; + TVector<TAsyncDataQueryResult> results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + + NThreading::WaitExceptionOrAll(results).Wait(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); + + for (auto& result : results) { + UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS); + } + sids.clear(); + results.clear(); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + + driver.Stop(true); + } + + Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 10; + + TVector<TSession> sids; + TVector<TAsyncDataQueryResult> results; + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); + auto session = sessionResponse.GetSession(); + sids.push_back(session); + if (count == 0) { + // Force BAD session server response for ExecuteDataQuery + UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS); + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } else { + results.push_back(session.ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + } + + NThreading::WaitExceptionOrAll(results).Wait(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); + + for (size_t i = 0; i < results.size(); i++) { + if (i == 9) { + UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION); + } else { + UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS); + } + } + sids.clear(); + results.clear(); + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + + driver.Stop(true); + } + + Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + int count = 100; + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `Root/Test` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + } + + while (count--) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); + auto session = sessionResponse.GetSession(); + + // Assume 10us is too small to execute query and get response + auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), + NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + } + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + driver.Stop(true); + } + + Y_UNIT_TEST(MultiThreadSync) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + NYdb::NTable::TTableClient client(driver); + const int nThreads = 10; + const int nRequests = 1000; + auto job = [client]() mutable { + for (int i = 0; i < nRequests; i++) { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS); + } + }; + IThreadFactory* pool = SystemThreadFactory(); + + TVector<TAutoPtr<IThreadFactory::IThread>> threads; + threads.resize(nThreads); + for (int i = 0; i < nThreads; i++) { + threads[i] = pool->Run(job); + } + for (int i = 0; i < nThreads; i++) { + threads[i]->Join(); + } + UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0); + driver.Stop(true); + } + + Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 45; + NYdb::NTable::TTableClient client(driver, + TClientSettings() + .SessionPoolSettings( + TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 100; + NYdb::EStatus statuses[nThreads]; + TVector<TMaybe<NYdb::NTable::TSession>> sessions; + sessions.resize(nThreads); + TAtomic t = 0; + auto job = [client, &t, &statuses, &sessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + int i = AtomicIncrement(t); + statuses[--i] = sessionResponse.GetStatus(); + if (statuses[i] == EStatus::SUCCESS) { + auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); + UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); + sessions[i] = sessionResponse.GetSession(); + } + }; + IThreadFactory* pool = SystemThreadFactory(); + + TVector<TAutoPtr<IThreadFactory::IThread>> threads; + threads.resize(nThreads); + for (int i = 0; i < nThreads; i++) { + threads[i] = pool->Run(job); + } + for (int i = 0; i < nThreads; i++) { + threads[i]->Join(); + } + + sessions.clear(); + + int successCount = 0; + int exhaustedCount = 0; + for (int i = 0; i < nThreads; i++) { + switch (statuses[i]) { + case EStatus::SUCCESS: + successCount++; + break; + case EStatus::CLIENT_RESOURCE_EXHAUSTED: + exhaustedCount++; + break; + default: + UNIT_ASSERT(false); + } + } + + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions); + UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions); + driver.Stop(true); + } + + Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + const int maxActiveSessions = 10; + NYdb::NTable::TTableClient client(driver, + TClientSettings() + .SessionPoolSettings( + TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); + + constexpr int nThreads = 20; + constexpr int nRequests = 50; + std::array<TVector<TAsyncDataQueryResult>, nThreads> results; + TAtomic t = 0; + TAtomic validSessions = 0; + auto job = [client, &t, &results, &validSessions]() mutable { + auto sessionResponse = client.GetSession().ExtractValueSync(); + + int i = AtomicIncrement(t); + TVector<TAsyncDataQueryResult>& r = results[--i]; + + if (sessionResponse.GetStatus() != EStatus::SUCCESS) { + return; + } + AtomicIncrement(validSessions); + + for (int i = 0; i < nRequests; i++) { + r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", + TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); + } + }; + IThreadFactory* pool = SystemThreadFactory(); + + TVector<TAutoPtr<IThreadFactory::IThread>> threads; + threads.resize(nThreads); + for (int i = 0; i < nThreads; i++) { + threads[i] = pool->Run(job); + } + for (int i = 0; i < nThreads; i++) { + threads[i]->Join(); + } + + for (auto& r : results) { + NThreading::WaitExceptionOrAll(r).Wait(); + } + for (auto& r : results) { + if (!r.empty()) { + int ok = 0; + int bad = 0; + for (auto& asyncStatus : r) { + auto res = asyncStatus.GetValue(); + if (res.IsSuccess()) { + ok++; + } else { + UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY); + bad++; + } + } + //UNIT_ASSERT_VALUES_EQUAL(ok, 1); + //UNIT_ASSERT_VALUES_EQUAL(bad, nRequests - 1); + } + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions); + auto curExpectedActive = maxActiveSessions; + auto empty = 0; + for (auto& r : results) { + if (!r.empty()) { + r.clear(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive); + } else { + empty++; + } + } + UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + driver.Stop(true); + } + + Y_UNIT_TEST(SessionsServerLimit) { + NKikimrConfig::TAppConfig appConfig; + auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); + tableServiceConfig.SetSessionsLimitPerNode(2); + + TKikimrWithGrpcAndRootSchema server(appConfig); + + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session2 = sessionResult.GetSession(); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); + + auto status = session1.Close().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + + sessionResult = client.CreateSession().ExtractValueSync(); + sessionResult.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + } + + Y_UNIT_TEST(SessionsServerLimitWithSessionPool) { + NKikimrConfig::TAppConfig appConfig; + auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); + tableServiceConfig.SetSessionsLimitPerNode(2); + + TKikimrWithGrpcAndRootSchema server(appConfig); + + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + auto sessionResult1 = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + auto session1 = sessionResult1.GetSession(); + + auto sessionResult2 = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + auto session2 = sessionResult2.GetSession(); + + { + auto sessionResult3 = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + + auto status = session1.Close().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); + + // Close doesnt free session from user perspective, + // the value of ActiveSessionsCounter will be same after Close() call. + // Probably we want to chenge this contract + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + + auto result = session2.ExecuteDataQuery(R"___( + SELECT 1; + )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + sessionResult1 = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); + + auto sessionResult3 = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); + + auto tmp = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5); + sessionResult1 = tmp; // here we reset previous created session object, + // so perform close rpc call implicitly and delete it + UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); + } + + Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) { + NKikimrConfig::TAppConfig appConfig; + + TKikimrWithGrpcAndRootSchema server(appConfig); + + TVector<TString> sessionIds; + int iterations = 50; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + auto sessionResult = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + UNIT_ASSERT(status.ok()); + auto deferred = response.operation(); + UNIT_ASSERT(deferred.ready() == true); + UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION); + } + } + + Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) { + NKikimrConfig::TAppConfig appConfig; + + TKikimrWithGrpcAndRootSchema server(appConfig); + + TVector<TString> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + + if (RandomNumber<ui32>(10) == 5) { + client.Stop().Apply([client](NThreading::TFuture<void> future){ + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); + return future; + }).Wait(); + } else { + client.Stop().Wait(); + } + + if (iterations & 4) { + driver.Stop(true); + } + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + UNIT_ASSERT(status.ok()); + auto deferred = response.operation(); + UNIT_ASSERT(deferred.ready() == true); + UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); + } + } + + Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) { + NKikimrConfig::TAppConfig appConfig; + + TKikimrWithGrpcAndRootSchema server(appConfig); + + TVector<TString> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + driver.Stop(true); + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + UNIT_ASSERT(status.ok()); + auto deferred = response.operation(); + UNIT_ASSERT(deferred.ready() == true); + UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); + } + } + + Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) { + NKikimrConfig::TAppConfig appConfig; + + TKikimrWithGrpcAndRootSchema server(appConfig); + + TVector<TString> sessionIds; + int iterations = 100; + + while (iterations--) { + NYdb::TDriver driver(TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); + NYdb::NTable::TTableClient client(driver); + //TODO: remove this scope after session tracker implementation + { + auto sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + auto session1 = sessionResult.GetSession(); + sessionIds.push_back(session1.GetId()); + + sessionResult = client.GetSession().ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); + // Here previous created session will be returnet to session pool + session1 = sessionResult.GetSession(); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); + sessionIds.push_back(session1.GetId()); + } + } + + std::shared_ptr<grpc::Channel> channel; + channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); + auto stub = Ydb::Table::V1::TableService::NewStub(channel); + for (const auto& sessionId : sessionIds) { + grpc::ClientContext context; + Ydb::Table::KeepAliveRequest request; + request.set_session_id(sessionId); + Ydb::Table::KeepAliveResponse response; + auto status = stub->KeepAlive(&context, request, &response); + UNIT_ASSERT(status.ok()); + auto deferred = response.operation(); + UNIT_ASSERT(deferred.ready() == true); + UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); + } + } +} diff --git a/ydb/services/ydb/ydb_table_ut.cpp b/ydb/services/ydb/ydb_table_ut.cpp index 2aa8cf08b6..abdf23ece7 100644 --- a/ydb/services/ydb/ydb_table_ut.cpp +++ b/ydb/services/ydb/ydb_table_ut.cpp @@ -239,347 +239,6 @@ Y_UNIT_TEST_SUITE(YdbYqlClient) { UNIT_ASSERT_EQUAL(status.GetStatus(), EStatus::SUCCESS); } - Y_UNIT_TEST(TestSessionPool) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - const TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - THashSet<TString> sids; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.insert(session.GetId()); - auto result = session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(result.GetEndpoint(), location); - } - // All requests used one session - UNIT_ASSERT_VALUES_EQUAL(sids.size(), 1); - // No more session captured by client - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - Y_UNIT_TEST(TestMultipleSessions) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - TVector<TSession> sids; - TVector<TAsyncDataQueryResult> results; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.push_back(session); - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } - - NThreading::WaitExceptionOrAll(results).Wait(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); - - for (auto& result : results) { - UNIT_ASSERT_EQUAL(result.GetValue().GetStatus(), EStatus::SUCCESS); - } - sids.clear(); - results.clear(); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - Y_UNIT_TEST(TestActiveSessionCountAfterBadSession) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 10; - - TVector<TSession> sids; - TVector<TAsyncDataQueryResult> results; - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - sids.push_back(session); - if (count == 0) { - // Force BAD session server response for ExecuteDataQuery - UNIT_ASSERT_EQUAL(session.Close().GetValueSync().GetStatus(), EStatus::SUCCESS); - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } else { - results.push_back(session.ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } - } - - NThreading::WaitExceptionOrAll(results).Wait(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 10); - - for (size_t i = 0; i < results.size(); i++) { - if (i == 9) { - UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::BAD_SESSION); - } else { - UNIT_ASSERT_EQUAL(results[i].GetValue().GetStatus(), EStatus::SUCCESS); - } - } - sids.clear(); - results.clear(); - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - int count = 100; - - { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT(sessionResponse.IsSuccess()); - auto session = sessionResponse.GetSession(); - auto result = session.ExecuteSchemeQuery(R"___( - CREATE TABLE `Root/Test` ( - Key Uint32, - Value String, - PRIMARY KEY (Key) - ); - )___").ExtractValueSync(); - UNIT_ASSERT(result.IsSuccess()); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - } - - while (count--) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.IsTransportError(), false); - auto session = sessionResponse.GetSession(); - - // Assume 10us is too small to execute query and get response - auto res = session.ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), - NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - } - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadSync) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - NYdb::NTable::TTableClient client(driver); - const int nThreads = 10; - const int nRequests = 1000; - auto job = [client]() mutable { - for (int i = 0; i < nRequests; i++) { - auto sessionResponse = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_EQUAL(sessionResponse.GetStatus(), EStatus::SUCCESS); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - UNIT_ASSERT_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadSessionPoolLimitSync) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - const int maxActiveSessions = 45; - NYdb::NTable::TTableClient client(driver, - TClientSettings() - .SessionPoolSettings( - TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); - - constexpr int nThreads = 100; - NYdb::EStatus statuses[nThreads]; - TVector<TMaybe<NYdb::NTable::TSession>> sessions; - sessions.resize(nThreads); - TAtomic t = 0; - auto job = [client, &t, &statuses, &sessions]() mutable { - auto sessionResponse = client.GetSession().ExtractValueSync(); - int i = AtomicIncrement(t); - statuses[--i] = sessionResponse.GetStatus(); - if (statuses[i] == EStatus::SUCCESS) { - auto execStatus = sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync().GetStatus(); - UNIT_ASSERT_EQUAL(execStatus, EStatus::SUCCESS); - sessions[i] = sessionResponse.GetSession(); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - - sessions.clear(); - - int successCount = 0; - int exhaustedCount = 0; - for (int i = 0; i < nThreads; i++) { - switch (statuses[i]) { - case EStatus::SUCCESS: - successCount++; - break; - case EStatus::CLIENT_RESOURCE_EXHAUSTED: - exhaustedCount++; - break; - default: - UNIT_ASSERT(false); - } - } - - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(successCount, maxActiveSessions); - UNIT_ASSERT_VALUES_EQUAL(exhaustedCount, nThreads - maxActiveSessions); - driver.Stop(true); - } - - Y_UNIT_TEST(MultiThreadMultipleRequestsOnSharedSessions) { - TKikimrWithGrpcAndRootSchema server; - ui16 grpc = server.GetPort(); - - TString location = TStringBuilder() << "localhost:" << grpc; - - auto driver = NYdb::TDriver( - TDriverConfig() - .SetEndpoint(location)); - - const int maxActiveSessions = 10; - NYdb::NTable::TTableClient client(driver, - TClientSettings() - .SessionPoolSettings( - TSessionPoolSettings().MaxActiveSessions(maxActiveSessions))); - - constexpr int nThreads = 20; - constexpr int nRequests = 50; - std::array<TVector<TAsyncDataQueryResult>, nThreads> results; - TAtomic t = 0; - TAtomic validSessions = 0; - auto job = [client, &t, &results, &validSessions]() mutable { - auto sessionResponse = client.GetSession().ExtractValueSync(); - - int i = AtomicIncrement(t); - TVector<TAsyncDataQueryResult>& r = results[--i]; - - if (sessionResponse.GetStatus() != EStatus::SUCCESS) { - return; - } - AtomicIncrement(validSessions); - - for (int i = 0; i < nRequests; i++) { - r.push_back(sessionResponse.GetSession().ExecuteDataQuery("SELECT 42;", - TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx())); - } - }; - IThreadFactory* pool = SystemThreadFactory(); - - TVector<TAutoPtr<IThreadFactory::IThread>> threads; - threads.resize(nThreads); - for (int i = 0; i < nThreads; i++) { - threads[i] = pool->Run(job); - } - for (int i = 0; i < nThreads; i++) { - threads[i]->Join(); - } - - for (auto& r : results) { - NThreading::WaitExceptionOrAll(r).Wait(); - } - for (auto& r : results) { - if (!r.empty()) { - int ok = 0; - int bad = 0; - for (auto& asyncStatus : r) { - auto res = asyncStatus.GetValue(); - if (res.IsSuccess()) { - ok++; - } else { - UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), EStatus::SESSION_BUSY); - bad++; - } - } - //UNIT_ASSERT_VALUES_EQUAL(ok, 1); - //UNIT_ASSERT_VALUES_EQUAL(bad, nRequests - 1); - } - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), maxActiveSessions); - auto curExpectedActive = maxActiveSessions; - auto empty = 0; - for (auto& r : results) { - if (!r.empty()) { - r.clear(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), --curExpectedActive); - } else { - empty++; - } - } - UNIT_ASSERT_VALUES_EQUAL(empty, nThreads - maxActiveSessions); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - driver.Stop(true); - } - Y_UNIT_TEST(TestColumnOrder) { TKikimrWithGrpcAndRootSchema server; ui16 grpc = server.GetPort(); @@ -2287,291 +1946,6 @@ R"___(<main>: Error: Transaction not found: , code: 2015 UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); } - Y_UNIT_TEST(SessionsServerLimit) { - NKikimrConfig::TAppConfig appConfig; - auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); - tableServiceConfig.SetSessionsLimitPerNode(2); - - TKikimrWithGrpcAndRootSchema server(appConfig); - - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - - sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session2 = sessionResult.GetSession(); - - sessionResult = client.CreateSession().ExtractValueSync(); - sessionResult.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); - - auto status = session1.Close().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); - - auto result = session2.ExecuteDataQuery(R"___( - SELECT 1; - )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - - sessionResult = client.CreateSession().ExtractValueSync(); - sessionResult.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - } - - Y_UNIT_TEST(SessionsServerLimitWithSessionPool) { - NKikimrConfig::TAppConfig appConfig; - auto& tableServiceConfig = *appConfig.MutableTableServiceConfig(); - tableServiceConfig.SetSessionsLimitPerNode(2); - - TKikimrWithGrpcAndRootSchema server(appConfig); - - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult1 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - auto session1 = sessionResult1.GetSession(); - - auto sessionResult2 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult2.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - auto session2 = sessionResult2.GetSession(); - - { - auto sessionResult3 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); - } - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - - auto status = session1.Close().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(status.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(status.GetStatus(), EStatus::SUCCESS); - - // Close doesnt free session from user perspective, - // the value of ActiveSessionsCounter will be same after Close() call. - // Probably we want to chenge this contract - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - - auto result = session2.ExecuteDataQuery(R"___( - SELECT 1; - )___", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - sessionResult1 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetSession().GetId().empty(), false); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 3); - - auto sessionResult3 = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(sessionResult3.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); - - auto tmp = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 5); - sessionResult1 = tmp; // here we reset previous created session object, - // so perform close rpc call implicitly and delete it - UNIT_ASSERT_VALUES_EQUAL(sessionResult1.GetStatus(), EStatus::OVERLOADED); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 4); - } - - Y_UNIT_TEST(CloseSessionAfterDriverDtorWithoutSessionPool) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - TVector<TString> sessionIds; - int iterations = 50; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - auto sessionResult = client.CreateSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(sessionId); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT_VALUES_EQUAL(deferred.status(), Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolExplicit) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - TVector<TString> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - - if (RandomNumber<ui32>(10) == 5) { - client.Stop().Apply([client](NThreading::TFuture<void> future){ - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 0); - return future; - }).Wait(); - } else { - client.Stop().Wait(); - } - - if (iterations & 4) { - driver.Stop(true); - } - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(sessionId); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolExplicitDriverStopOnly) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - TVector<TString> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - driver.Stop(true); - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(sessionId); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } - - Y_UNIT_TEST(CloseSessionWithSessionPoolFromDtors) { - NKikimrConfig::TAppConfig appConfig; - - TKikimrWithGrpcAndRootSchema server(appConfig); - - TVector<TString> sessionIds; - int iterations = 100; - - while (iterations--) { - NYdb::TDriver driver(TDriverConfig() - .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort())); - NYdb::NTable::TTableClient client(driver); - //TODO: remove this scope after session tracker implementation - { - auto sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - auto session1 = sessionResult.GetSession(); - sessionIds.push_back(session1.GetId()); - - sessionResult = client.GetSession().ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS); - // Here previous created session will be returnet to session pool - session1 = sessionResult.GetSession(); - UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); - sessionIds.push_back(session1.GetId()); - } - } - - std::shared_ptr<grpc::Channel> channel; - channel = grpc::CreateChannel("localhost:" + ToString(server.GetPort()), grpc::InsecureChannelCredentials()); - auto stub = Ydb::Table::V1::TableService::NewStub(channel); - for (const auto& sessionId : sessionIds) { - grpc::ClientContext context; - Ydb::Table::KeepAliveRequest request; - request.set_session_id(sessionId); - Ydb::Table::KeepAliveResponse response; - auto status = stub->KeepAlive(&context, request, &response); - UNIT_ASSERT(status.ok()); - auto deferred = response.operation(); - UNIT_ASSERT(deferred.ready() == true); - UNIT_ASSERT(deferred.status() == Ydb::StatusIds::BAD_SESSION); - } - } - Y_UNIT_TEST(DeleteTableWithDeletedIndex) { TKikimrWithGrpcAndRootSchema server; |