aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpixcc <pixcc@yandex-team.com>2023-11-27 09:26:32 +0300
committerpixcc <pixcc@yandex-team.com>2023-11-27 09:47:55 +0300
commite4b7582c62f8047039915487f1815bd56ea81973 (patch)
tree2dca1d38ee260bb911ded563c87388222a7e3512
parentc0cdf19cd80fb6fa536681c87a677b39e965ac1a (diff)
downloadydb-e4b7582c62f8047039915487f1815bd56ea81973.tar.gz
Discover dedicated nodes for serverless db KIKIMR-20001
Discover dedicated nodes for serverless db KIKIMR-20001
-rw-r--r--ydb/core/discovery/discovery.cpp17
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp6
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.cpp1
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h3
-rw-r--r--ydb/tests/functional/serverless/conftest.py43
-rw-r--r--ydb/tests/functional/serverless/test_serverless.py172
-rw-r--r--ydb/tests/library/harness/kikimr_config.py8
7 files changed, 246 insertions, 4 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp
index 6e5f4f247c..fcc6f8f105 100644
--- a/ydb/core/discovery/discovery.cpp
+++ b/ydb/core/discovery/discovery.cpp
@@ -480,7 +480,7 @@ public:
}
auto info = entry.DomainInfo;
- if (info->DomainKey != info->ResourcesDomainKey) {
+ if (NeedResolveResources(info)) {
DLOG_D("Resolve resources domain"
<< ": domain key# " << info->DomainKey
<< ", resources domain key# " << info->ResourcesDomainKey);
@@ -494,6 +494,21 @@ public:
MaybeReply();
}
+ static bool NeedResolveResources(TIntrusivePtr<NSchemeCache::TDomainInfo> domainInfo) {
+ if (!domainInfo->IsServerless()) {
+ return false;
+ }
+
+ switch (domainInfo->ServerlessComputeResourcesMode) {
+ case NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED:
+ return false;
+ case NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED:
+ return true;
+ default:
+ return true;
+ }
+ }
+
void MaybeReply() {
if (!LookupResponse || !SchemeCacheResponse) {
return;
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 099cb36627..6fafbb411c 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -213,10 +213,12 @@ private:
if (database->SchemeBoardResult) {
const auto& domain = database->SchemeBoardResult->DescribeSchemeResult.GetPathDescription().GetDomainDescription();
if (domain.HasResourcesDomainKey() && !skipResourceCheck && DynamicNode) {
- TSubDomainKey subdomainKey(domain.GetResourcesDomainKey());
- if (!SubDomainKeys.contains(subdomainKey)) {
+ const TSubDomainKey resourceDomainKey(domain.GetResourcesDomainKey());
+ const TSubDomainKey domainKey(domain.GetDomainKey());
+ if (!SubDomainKeys.contains(resourceDomainKey) && !SubDomainKeys.contains(domainKey)) {
TStringBuilder error;
error << "Unexpected node to perform query on database: " << databaseName
+ << ", domain: " << domain.GetDomainKey().ShortDebugString()
<< ", resource domain: " << domain.GetResourcesDomainKey().ShortDebugString();
LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, error);
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error);
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp
index 7c867beb27..2a7bbaf541 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.cpp
+++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp
@@ -31,6 +31,7 @@ TString TDomainInfo::ToString() const {
<< " DomainKey: " << DomainKey
<< " ResourcesDomainKey: " << ResourcesDomainKey
<< " Params { " << Params.ShortDebugString() << " }"
+ << " ServerlessComputeResourcesMode: " << ServerlessComputeResourcesMode
<< " }";
}
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index 138fc65f9c..cd7ab0f917 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -55,6 +55,7 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
: DomainKey(GetDomainKey(descr.GetDomainKey()))
, Params(descr.GetProcessingParams())
, Coordinators(descr.GetProcessingParams())
+ , ServerlessComputeResourcesMode(descr.GetServerlessComputeResourcesMode())
{
if (descr.HasResourcesDomainKey()) {
ResourcesDomainKey = GetDomainKey(descr.GetResourcesDomainKey());
@@ -83,6 +84,8 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> {
TPathId ResourcesDomainKey;
NKikimrSubDomains::TProcessingParams Params;
TCoordinators Coordinators;
+ NKikimrSubDomains::EServerlessComputeResourcesMode ServerlessComputeResourcesMode =
+ NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_UNSPECIFIED;
TString ToString() const;
diff --git a/ydb/tests/functional/serverless/conftest.py b/ydb/tests/functional/serverless/conftest.py
index 0566c69458..314f5f0f43 100644
--- a/ydb/tests/functional/serverless/conftest.py
+++ b/ydb/tests/functional/serverless/conftest.py
@@ -116,3 +116,46 @@ def ydb_disk_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe
with ydb_serverless_db_ctx(ydb_cluster, database_name, ydb_hostel_db, disk_quotas=disk_quotas):
yield database_name
+
+
+@contextlib.contextmanager
+def ydb_serverless_db_with_nodes_ctx(ydb_cluster, database, hostel_db, timeout_seconds=100):
+ logger.info("setup ydb_serverless_db_with_nodes %s using hostel %s", database, hostel_db)
+
+ ydb_cluster.remove_database(
+ database,
+ timeout_seconds=timeout_seconds
+ )
+
+ ydb_cluster.create_serverless_database(
+ database,
+ hostel_db=hostel_db,
+ timeout_seconds=timeout_seconds,
+ attributes={
+ "cloud_id": "CLOUD_ID_VAL",
+ "folder_id": "FOLDER_ID_VAL",
+ "database_id": "DATABASE_ID_VAL",
+ },
+ )
+
+ database_nodes = ydb_cluster.register_and_start_slots(database, 3)
+ ydb_cluster.wait_tenant_up(database)
+
+ try:
+ yield database
+ finally:
+ logger.info("destroy ydb_serverless_db_with_nodes for %s", database)
+ ydb_cluster.remove_database(
+ database,
+ timeout_seconds=timeout_seconds
+ )
+
+ ydb_cluster.unregister_and_stop_slots(database_nodes)
+
+
+@pytest.fixture(scope='module')
+def ydb_serverless_db_with_nodes(ydb_cluster, ydb_root, ydb_hostel_db):
+ database_name = os.path.join(ydb_root, "serverless_with_nodes")
+
+ with ydb_serverless_db_with_nodes_ctx(ydb_cluster, database_name, ydb_hostel_db):
+ yield database_name
diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py
index fefe1a2d77..4c978b10f0 100644
--- a/ydb/tests/functional/serverless/test_serverless.py
+++ b/ydb/tests/functional/serverless/test_serverless.py
@@ -5,8 +5,9 @@ import os
import time
import copy
import pytest
+import subprocess
-from hamcrest import assert_that, contains_inanyorder, not_none
+from hamcrest import assert_that, contains_inanyorder, not_none, not_, only_contains, is_in
from tornado import gen
from tornado.ioloop import IOLoop
@@ -42,7 +43,11 @@ CLUSTER_CONFIG = dict(
'CMS': LogLevels.DEBUG,
'CMS_TENANTS': LogLevels.DEBUG,
+ 'DISCOVERY': LogLevels.TRACE,
+ 'GRPC_SERVER': LogLevels.DEBUG
},
+ enforce_user_token_requirement=True,
+ default_user_sid="user@builtin"
)
@@ -469,3 +474,168 @@ def test_discovery(ydb_hostel_db, ydb_serverless_db, ydb_endpoint):
assert_that(hostel_db_endpoints, not_none())
assert_that(serverless_db_endpoints, not_none())
assert_that(serverless_db_endpoints, contains_inanyorder(*hostel_db_endpoints))
+
+
+def ydbcli_db_schema_exec(cluster, operation_proto):
+ endpoint = f'{cluster.nodes[1].host}:{cluster.nodes[1].port}'
+ args = [
+ cluster.nodes[1].binary_path,
+ f'--server=grpc://{endpoint}',
+ 'db',
+ 'schema',
+ 'exec',
+ operation_proto,
+ ]
+ r = subprocess.run(args, capture_output=True)
+ assert r.returncode == 0, r.stderr.decode('utf-8')
+
+
+def alter_database_serverless_compute_resources_mode(cluster, database_path, serverless_compute_resources_mode):
+ alter_proto = r'''ModifyScheme {
+ OperationType: ESchemeOpAlterExtSubDomain
+ WorkingDir: "%s"
+ SubDomain {
+ Name: "%s"
+ ServerlessComputeResourcesMode: %s
+ }
+ }''' % (
+ os.path.dirname(database_path),
+ os.path.basename(database_path),
+ serverless_compute_resources_mode
+ )
+
+ ydbcli_db_schema_exec(cluster, alter_proto)
+
+
+def test_discovery_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster):
+ def list_endpoints(database):
+ logger.debug("List endpoints of %s", database)
+
+ driver_config = ydb.DriverConfig(ydb_endpoint, database)
+ with ydb.Driver(driver_config) as driver:
+ driver.wait(120)
+
+ resolver = ydb.DiscoveryEndpointsResolver(driver_config)
+ result = resolver.resolve()
+ if result is not None:
+ return result.endpoints
+ return result
+
+ alter_database_serverless_compute_resources_mode(
+ ydb_cluster,
+ ydb_serverless_db_with_nodes,
+ "SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED"
+ )
+ serverless_db_shared_endpoints = list_endpoints(ydb_serverless_db_with_nodes)
+ hostel_db_endpoints = list_endpoints(ydb_hostel_db)
+
+ assert_that(hostel_db_endpoints, not_none())
+ assert_that(serverless_db_shared_endpoints, not_none())
+ assert_that(serverless_db_shared_endpoints, contains_inanyorder(*hostel_db_endpoints))
+
+ alter_database_serverless_compute_resources_mode(
+ ydb_cluster,
+ ydb_serverless_db_with_nodes,
+ "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED"
+ )
+ serverless_db_dedicated_endpoints = list_endpoints(ydb_serverless_db_with_nodes)
+
+ assert_that(serverless_db_dedicated_endpoints, not_none())
+ assert_that(serverless_db_dedicated_endpoints, only_contains(not_(is_in(serverless_db_shared_endpoints))))
+
+
+def test_create_table_using_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster):
+ alter_database_serverless_compute_resources_mode(
+ ydb_cluster,
+ ydb_serverless_db_with_nodes,
+ "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED"
+ )
+
+ database = ydb_serverless_db_with_nodes
+ driver_config = ydb.DriverConfig(ydb_endpoint, database)
+ driver = ydb.Driver(driver_config)
+ driver.wait(120)
+
+ dir_path = os.path.join(database, "dir")
+ driver.scheme_client.make_directory(dir_path)
+
+ with ydb.SessionPool(driver) as pool:
+ def create_table(session, path):
+ session.create_table(
+ path,
+ ydb.TableDescription()
+ .with_column(ydb.Column("id", ydb.OptionalType(ydb.DataType.Uint64)))
+ .with_primary_key("id")
+ )
+
+ def write_some_data(session, path):
+ session.transaction().execute(
+ f"UPSERT INTO `{path}` (id) VALUES (1), (2), (3);",
+ commit_tx=True)
+
+ def drop_table(session, path):
+ session.drop_table(
+ path
+ )
+
+ table_path = os.path.join(dir_path, "create_table_with_dedicated_nodes_table")
+ pool.retry_operation_sync(create_table, None, table_path)
+ pool.retry_operation_sync(write_some_data, None, table_path)
+ pool.retry_operation_sync(drop_table, None, table_path)
+
+
+def test_seamless_migration_to_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster):
+ alter_database_serverless_compute_resources_mode(
+ ydb_cluster,
+ ydb_serverless_db_with_nodes,
+ "SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED"
+ )
+
+ database = ydb_serverless_db_with_nodes
+ driver_config = ydb.DriverConfig(ydb_endpoint, database)
+ driver = ydb.Driver(driver_config)
+ driver.wait(120)
+
+ session = driver.table_client.session().create()
+ path = os.path.join(database, "seamless_migration_table")
+ session.create_table(
+ path,
+ ydb.TableDescription()
+ .with_column(ydb.Column("id", ydb.OptionalType(ydb.DataType.Uint64)))
+ .with_primary_key("id")
+ )
+
+ session.transaction().execute(
+ f"UPSERT INTO `{path}` (id) VALUES (1), (2), (3);",
+ commit_tx=True
+ )
+
+ alter_database_serverless_compute_resources_mode(
+ ydb_cluster,
+ ydb_serverless_db_with_nodes,
+ "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED"
+ )
+
+ # Old session keeps work fine with old connections to shared nodes
+ session.transaction().execute(
+ f"UPSERT INTO `{path}` (id) VALUES (4), (5), (6);",
+ commit_tx=True
+ )
+
+ # Force rediscovery
+ newDriver = ydb.Driver(driver_config)
+ newDriver.wait(120)
+ session._driver = newDriver
+
+ # Old session works fine with new connections to dedicated nodes
+ session.transaction().execute(
+ f"UPSERT INTO `{path}` (id) VALUES (7), (8), (9);",
+ commit_tx=True
+ )
+
+ # New session works fine
+ newSession = newDriver.table_client.session().create()
+ newSession.transaction().execute(
+ f"UPSERT INTO `{path}` (id) VALUES (10), (11), (12);",
+ commit_tx=True
+ )
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index e5204302d5..41b409d66b 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -158,6 +158,8 @@ class KikimrConfigGenerator(object):
extra_feature_flags=None, # list[str]
extra_grpc_services=None, # list[str]
hive_config=None,
+ enforce_user_token_requirement=False,
+ default_user_sid=None
):
if extra_feature_flags is None:
extra_feature_flags = []
@@ -366,6 +368,12 @@ class KikimrConfigGenerator(object):
if os.getenv("YDB_ALLOW_ORIGIN") is not None:
self.yaml_config["monitoring_config"] = {"allow_origin": str(os.getenv("YDB_ALLOW_ORIGIN"))}
+ if enforce_user_token_requirement:
+ self.yaml_config["domains_config"]["security_config"]["enforce_user_token_requirement"] = True
+
+ if default_user_sid:
+ self.yaml_config["domains_config"]["security_config"]["default_user_sids"] = [default_user_sid]
+
@property
def pdisks_info(self):
return self._pdisks_info