diff options
author | pixcc <pixcc@yandex-team.com> | 2023-11-27 09:26:32 +0300 |
---|---|---|
committer | pixcc <pixcc@yandex-team.com> | 2023-11-27 09:47:55 +0300 |
commit | e4b7582c62f8047039915487f1815bd56ea81973 (patch) | |
tree | 2dca1d38ee260bb911ded563c87388222a7e3512 | |
parent | c0cdf19cd80fb6fa536681c87a677b39e965ac1a (diff) | |
download | ydb-e4b7582c62f8047039915487f1815bd56ea81973.tar.gz |
Discover dedicated nodes for serverless db KIKIMR-20001
Discover dedicated nodes for serverless db KIKIMR-20001
-rw-r--r-- | ydb/core/discovery/discovery.cpp | 17 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/scheme_cache.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/scheme_cache/scheme_cache.h | 3 | ||||
-rw-r--r-- | ydb/tests/functional/serverless/conftest.py | 43 | ||||
-rw-r--r-- | ydb/tests/functional/serverless/test_serverless.py | 172 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_config.py | 8 |
7 files changed, 246 insertions, 4 deletions
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index 6e5f4f247c..fcc6f8f105 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -480,7 +480,7 @@ public: } auto info = entry.DomainInfo; - if (info->DomainKey != info->ResourcesDomainKey) { + if (NeedResolveResources(info)) { DLOG_D("Resolve resources domain" << ": domain key# " << info->DomainKey << ", resources domain key# " << info->ResourcesDomainKey); @@ -494,6 +494,21 @@ public: MaybeReply(); } + static bool NeedResolveResources(TIntrusivePtr<NSchemeCache::TDomainInfo> domainInfo) { + if (!domainInfo->IsServerless()) { + return false; + } + + switch (domainInfo->ServerlessComputeResourcesMode) { + case NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED: + return false; + case NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED: + return true; + default: + return true; + } + } + void MaybeReply() { if (!LookupResponse || !SchemeCacheResponse) { return; diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 099cb36627..6fafbb411c 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -213,10 +213,12 @@ private: if (database->SchemeBoardResult) { const auto& domain = database->SchemeBoardResult->DescribeSchemeResult.GetPathDescription().GetDomainDescription(); if (domain.HasResourcesDomainKey() && !skipResourceCheck && DynamicNode) { - TSubDomainKey subdomainKey(domain.GetResourcesDomainKey()); - if (!SubDomainKeys.contains(subdomainKey)) { + const TSubDomainKey resourceDomainKey(domain.GetResourcesDomainKey()); + const TSubDomainKey domainKey(domain.GetDomainKey()); + if (!SubDomainKeys.contains(resourceDomainKey) && !SubDomainKeys.contains(domainKey)) { TStringBuilder error; error << "Unexpected node to perform query on database: " << databaseName + << ", domain: " << domain.GetDomainKey().ShortDebugString() << ", resource domain: " << domain.GetResourcesDomainKey().ShortDebugString(); LOG_ERROR(ctx, NKikimrServices::GRPC_SERVER, error); auto issue = MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error); diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp index 7c867beb27..2a7bbaf541 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.cpp +++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp @@ -31,6 +31,7 @@ TString TDomainInfo::ToString() const { << " DomainKey: " << DomainKey << " ResourcesDomainKey: " << ResourcesDomainKey << " Params { " << Params.ShortDebugString() << " }" + << " ServerlessComputeResourcesMode: " << ServerlessComputeResourcesMode << " }"; } diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 138fc65f9c..cd7ab0f917 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -55,6 +55,7 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> { : DomainKey(GetDomainKey(descr.GetDomainKey())) , Params(descr.GetProcessingParams()) , Coordinators(descr.GetProcessingParams()) + , ServerlessComputeResourcesMode(descr.GetServerlessComputeResourcesMode()) { if (descr.HasResourcesDomainKey()) { ResourcesDomainKey = GetDomainKey(descr.GetResourcesDomainKey()); @@ -83,6 +84,8 @@ struct TDomainInfo : public TAtomicRefCount<TDomainInfo> { TPathId ResourcesDomainKey; NKikimrSubDomains::TProcessingParams Params; TCoordinators Coordinators; + NKikimrSubDomains::EServerlessComputeResourcesMode ServerlessComputeResourcesMode = + NKikimrSubDomains::SERVERLESS_COMPUTE_RESOURCES_MODE_UNSPECIFIED; TString ToString() const; diff --git a/ydb/tests/functional/serverless/conftest.py b/ydb/tests/functional/serverless/conftest.py index 0566c69458..314f5f0f43 100644 --- a/ydb/tests/functional/serverless/conftest.py +++ b/ydb/tests/functional/serverless/conftest.py @@ -116,3 +116,46 @@ def ydb_disk_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe with ydb_serverless_db_ctx(ydb_cluster, database_name, ydb_hostel_db, disk_quotas=disk_quotas): yield database_name + + +@contextlib.contextmanager +def ydb_serverless_db_with_nodes_ctx(ydb_cluster, database, hostel_db, timeout_seconds=100): + logger.info("setup ydb_serverless_db_with_nodes %s using hostel %s", database, hostel_db) + + ydb_cluster.remove_database( + database, + timeout_seconds=timeout_seconds + ) + + ydb_cluster.create_serverless_database( + database, + hostel_db=hostel_db, + timeout_seconds=timeout_seconds, + attributes={ + "cloud_id": "CLOUD_ID_VAL", + "folder_id": "FOLDER_ID_VAL", + "database_id": "DATABASE_ID_VAL", + }, + ) + + database_nodes = ydb_cluster.register_and_start_slots(database, 3) + ydb_cluster.wait_tenant_up(database) + + try: + yield database + finally: + logger.info("destroy ydb_serverless_db_with_nodes for %s", database) + ydb_cluster.remove_database( + database, + timeout_seconds=timeout_seconds + ) + + ydb_cluster.unregister_and_stop_slots(database_nodes) + + +@pytest.fixture(scope='module') +def ydb_serverless_db_with_nodes(ydb_cluster, ydb_root, ydb_hostel_db): + database_name = os.path.join(ydb_root, "serverless_with_nodes") + + with ydb_serverless_db_with_nodes_ctx(ydb_cluster, database_name, ydb_hostel_db): + yield database_name diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py index fefe1a2d77..4c978b10f0 100644 --- a/ydb/tests/functional/serverless/test_serverless.py +++ b/ydb/tests/functional/serverless/test_serverless.py @@ -5,8 +5,9 @@ import os import time import copy import pytest +import subprocess -from hamcrest import assert_that, contains_inanyorder, not_none +from hamcrest import assert_that, contains_inanyorder, not_none, not_, only_contains, is_in from tornado import gen from tornado.ioloop import IOLoop @@ -42,7 +43,11 @@ CLUSTER_CONFIG = dict( 'CMS': LogLevels.DEBUG, 'CMS_TENANTS': LogLevels.DEBUG, + 'DISCOVERY': LogLevels.TRACE, + 'GRPC_SERVER': LogLevels.DEBUG }, + enforce_user_token_requirement=True, + default_user_sid="user@builtin" ) @@ -469,3 +474,168 @@ def test_discovery(ydb_hostel_db, ydb_serverless_db, ydb_endpoint): assert_that(hostel_db_endpoints, not_none()) assert_that(serverless_db_endpoints, not_none()) assert_that(serverless_db_endpoints, contains_inanyorder(*hostel_db_endpoints)) + + +def ydbcli_db_schema_exec(cluster, operation_proto): + endpoint = f'{cluster.nodes[1].host}:{cluster.nodes[1].port}' + args = [ + cluster.nodes[1].binary_path, + f'--server=grpc://{endpoint}', + 'db', + 'schema', + 'exec', + operation_proto, + ] + r = subprocess.run(args, capture_output=True) + assert r.returncode == 0, r.stderr.decode('utf-8') + + +def alter_database_serverless_compute_resources_mode(cluster, database_path, serverless_compute_resources_mode): + alter_proto = r'''ModifyScheme { + OperationType: ESchemeOpAlterExtSubDomain + WorkingDir: "%s" + SubDomain { + Name: "%s" + ServerlessComputeResourcesMode: %s + } + }''' % ( + os.path.dirname(database_path), + os.path.basename(database_path), + serverless_compute_resources_mode + ) + + ydbcli_db_schema_exec(cluster, alter_proto) + + +def test_discovery_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster): + def list_endpoints(database): + logger.debug("List endpoints of %s", database) + + driver_config = ydb.DriverConfig(ydb_endpoint, database) + with ydb.Driver(driver_config) as driver: + driver.wait(120) + + resolver = ydb.DiscoveryEndpointsResolver(driver_config) + result = resolver.resolve() + if result is not None: + return result.endpoints + return result + + alter_database_serverless_compute_resources_mode( + ydb_cluster, + ydb_serverless_db_with_nodes, + "SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED" + ) + serverless_db_shared_endpoints = list_endpoints(ydb_serverless_db_with_nodes) + hostel_db_endpoints = list_endpoints(ydb_hostel_db) + + assert_that(hostel_db_endpoints, not_none()) + assert_that(serverless_db_shared_endpoints, not_none()) + assert_that(serverless_db_shared_endpoints, contains_inanyorder(*hostel_db_endpoints)) + + alter_database_serverless_compute_resources_mode( + ydb_cluster, + ydb_serverless_db_with_nodes, + "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED" + ) + serverless_db_dedicated_endpoints = list_endpoints(ydb_serverless_db_with_nodes) + + assert_that(serverless_db_dedicated_endpoints, not_none()) + assert_that(serverless_db_dedicated_endpoints, only_contains(not_(is_in(serverless_db_shared_endpoints)))) + + +def test_create_table_using_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster): + alter_database_serverless_compute_resources_mode( + ydb_cluster, + ydb_serverless_db_with_nodes, + "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED" + ) + + database = ydb_serverless_db_with_nodes + driver_config = ydb.DriverConfig(ydb_endpoint, database) + driver = ydb.Driver(driver_config) + driver.wait(120) + + dir_path = os.path.join(database, "dir") + driver.scheme_client.make_directory(dir_path) + + with ydb.SessionPool(driver) as pool: + def create_table(session, path): + session.create_table( + path, + ydb.TableDescription() + .with_column(ydb.Column("id", ydb.OptionalType(ydb.DataType.Uint64))) + .with_primary_key("id") + ) + + def write_some_data(session, path): + session.transaction().execute( + f"UPSERT INTO `{path}` (id) VALUES (1), (2), (3);", + commit_tx=True) + + def drop_table(session, path): + session.drop_table( + path + ) + + table_path = os.path.join(dir_path, "create_table_with_dedicated_nodes_table") + pool.retry_operation_sync(create_table, None, table_path) + pool.retry_operation_sync(write_some_data, None, table_path) + pool.retry_operation_sync(drop_table, None, table_path) + + +def test_seamless_migration_to_dedicated_nodes(ydb_hostel_db, ydb_serverless_db_with_nodes, ydb_endpoint, ydb_cluster): + alter_database_serverless_compute_resources_mode( + ydb_cluster, + ydb_serverless_db_with_nodes, + "SERVERLESS_COMPUTE_RESOURCES_MODE_SHARED" + ) + + database = ydb_serverless_db_with_nodes + driver_config = ydb.DriverConfig(ydb_endpoint, database) + driver = ydb.Driver(driver_config) + driver.wait(120) + + session = driver.table_client.session().create() + path = os.path.join(database, "seamless_migration_table") + session.create_table( + path, + ydb.TableDescription() + .with_column(ydb.Column("id", ydb.OptionalType(ydb.DataType.Uint64))) + .with_primary_key("id") + ) + + session.transaction().execute( + f"UPSERT INTO `{path}` (id) VALUES (1), (2), (3);", + commit_tx=True + ) + + alter_database_serverless_compute_resources_mode( + ydb_cluster, + ydb_serverless_db_with_nodes, + "SERVERLESS_COMPUTE_RESOURCES_MODE_DEDICATED" + ) + + # Old session keeps work fine with old connections to shared nodes + session.transaction().execute( + f"UPSERT INTO `{path}` (id) VALUES (4), (5), (6);", + commit_tx=True + ) + + # Force rediscovery + newDriver = ydb.Driver(driver_config) + newDriver.wait(120) + session._driver = newDriver + + # Old session works fine with new connections to dedicated nodes + session.transaction().execute( + f"UPSERT INTO `{path}` (id) VALUES (7), (8), (9);", + commit_tx=True + ) + + # New session works fine + newSession = newDriver.table_client.session().create() + newSession.transaction().execute( + f"UPSERT INTO `{path}` (id) VALUES (10), (11), (12);", + commit_tx=True + ) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index e5204302d5..41b409d66b 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -158,6 +158,8 @@ class KikimrConfigGenerator(object): extra_feature_flags=None, # list[str] extra_grpc_services=None, # list[str] hive_config=None, + enforce_user_token_requirement=False, + default_user_sid=None ): if extra_feature_flags is None: extra_feature_flags = [] @@ -366,6 +368,12 @@ class KikimrConfigGenerator(object): if os.getenv("YDB_ALLOW_ORIGIN") is not None: self.yaml_config["monitoring_config"] = {"allow_origin": str(os.getenv("YDB_ALLOW_ORIGIN"))} + if enforce_user_token_requirement: + self.yaml_config["domains_config"]["security_config"]["enforce_user_token_requirement"] = True + + if default_user_sid: + self.yaml_config["domains_config"]["security_config"]["default_user_sids"] = [default_user_sid] + @property def pdisks_info(self): return self._pdisks_info |