diff options
| author | Oleg <[email protected]> | 2025-10-24 05:08:48 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-24 05:08:48 +0300 |
| commit | 4d080588d79a43a26a7c1bd427cfa3773841f804 (patch) | |
| tree | 3da521cf1655c489ac70644d565206dd4e66ea5f | |
| parent | 564cf7cb2255a107b4f44c18b2a1844041f20b4d (diff) | |
Full restore test with complex schema change (#27202)
| -rw-r--r-- | ydb/tests/functional/backup_collection/basic_user_scenarios.py | 427 |
1 files changed, 427 insertions, 0 deletions
diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py index 5f6a3b95a3b..4d0dec965f3 100644 --- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py +++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py @@ -7,6 +7,7 @@ import yatest import pytest import tempfile import re +import uuid from typing import List from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -577,6 +578,95 @@ class BaseTestBackupInFiles(object): "last_success": last_success, } + def _rearrange_table(self, from_name: str, to_name: str): + full_from = f"/Root/{from_name}" + full_to = f"/Root/{to_name}" + + def run_cli(args): + cmd = [ + backup_bin(), + "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", self.root_dir, + ] + args + return yatest.common.execute(cmd, check_exit_code=False) + + def to_rel(p): + if p.startswith(self.root_dir + "/"): + return p[len(self.root_dir) + 1 :] + if p == self.root_dir: + return "" + return p.lstrip("/") + + src_rel = to_rel(full_from) + dst_rel = to_rel(full_to) + + parent = os.path.dirname(dst_rel) + parent_full = os.path.join(self.root_dir, parent) if parent else None + if parent and parent_full: + self.driver.scheme_client.list_directory(parent_full) + + mkdir_res = run_cli(["scheme", "mkdir", parent]) + if mkdir_res.exit_code != 0: + logger.debug("scheme mkdir parent returned code=%s stdout=%s stderr=%s", + mkdir_res.exit_code, + getattr(mkdir_res, "std_out", b"").decode("utf-8", "ignore"), + getattr(mkdir_res, "std_err", b"").decode("utf-8", "ignore")) + + # perform tools copy via CLI to the requested destination + item_arg = f"destination={dst_rel},source={src_rel}" + res = run_cli(["tools", "copy", "--item", item_arg]) + if res.exit_code != 0: + out = (res.std_out or b"").decode("utf-8", "ignore") + err = (res.std_err or b"").decode("utf-8", "ignore") + raise AssertionError(f"tools copy failed: from={full_from} to={full_to} code={res.exit_code} STDOUT: {out} STDERR: {err}") + + tmp_dir_rel = f"tmp_rearr_{int(time.time())}" + tmp_full = os.path.join(self.root_dir, tmp_dir_rel) + tmp_created = False + try: + # create temporary directory + try: + self.driver.scheme_client.list_directory(tmp_full) + tmp_created = False + except Exception: + res_mk = run_cli(["scheme", "mkdir", tmp_dir_rel]) + if res_mk.exit_code != 0: + logger.debug("tmp dir mkdir returned code=%s stdout=%s stderr=%s", + res_mk.exit_code, + getattr(res_mk, "std_out", b"").decode("utf-8", "ignore"), + getattr(res_mk, "std_err", b"").decode("utf-8", "ignore")) + else: + tmp_created = True + + # copy to temporary dir (basename of destination) + basename = os.path.basename(dst_rel) or to_rel(full_from) + item_tmp = f"destination={tmp_dir_rel}/{basename},source={src_rel}" + res_tmp = run_cli(["tools", "copy", "--item", item_tmp]) + if res_tmp.exit_code != 0: + logger.warning("tools copy to tmp dir failed: code=%s stdout=%s stderr=%s", + res_tmp.exit_code, + getattr(res_tmp, "std_out", b"").decode("utf-8", "ignore"), + getattr(res_tmp, "std_err", b"").decode("utf-8", "ignore")) + else: + # list temporary dir contents via scheme_client for visibility + try: + desc = self.driver.scheme_client.list_directory(tmp_full) + names = [c.name for c in desc.children if not is_system_object(c)] + logger.info("Temporary directory %s contents: %s", tmp_full, names) + except Exception as e: + logger.info("Failed to list tmp dir %s: %s", tmp_full, e) + + finally: + # remove temporary directory if we created it here + if tmp_created: + rmdir_res = run_cli(["scheme", "rmdir", "-rf", tmp_dir_rel]) + if rmdir_res.exit_code != 0: + logger.warning("Failed to rmdir tmp dir %s: code=%s stdout=%s stderr=%s", + tmp_dir_rel, + rmdir_res.exit_code, + getattr(rmdir_res, "std_out", b"").decode("utf-8", "ignore"), + getattr(rmdir_res, "std_err", b"").decode("utf-8", "ignore")) + class TestFullCycleLocalBackupRestore(BaseTestBackupInFiles): def _execute_yql(self, script, verbose=False): @@ -1683,3 +1773,340 @@ class TestIncrementalChainRestoreAfterDeletion(TestFullCycleLocalBackupRestore): if os.path.exists(export_dir): shutil.rmtree(export_dir) + + +class TestFullCycleLocalBackupRestoreWComplSchemaChange(TestFullCycleLocalBackupRestoreWSchemaChange): + def _decode_output(self, result) -> tuple: + """Helper to decode stdout and stderr from execution result""" + stdout = getattr(result, "std_out", b"").decode("utf-8", "ignore") + stderr = getattr(result, "std_err", b"").decode("utf-8", "ignore") + return stdout, stderr + + def _safe_capture_snapshot(self, table: str, default=None): + """Safely capture table snapshot, returning default on failure""" + try: + return self._capture_snapshot(table) + except Exception as e: + logger.debug(f"Failed to capture snapshot for {table}: {e}") + return default + + def _safe_capture_schema(self, table_path: str, default=None): + """Safely capture table schema, returning default on failure""" + try: + return self._capture_schema(table_path) + except Exception as e: + logger.debug(f"Failed to capture schema for {table_path}: {e}") + return default + + def _safe_capture_acl(self, table_path: str, default=None): + """Safely capture table ACL, returning default on failure""" + try: + return self._capture_acl(table_path) + except Exception as e: + logger.debug(f"Failed to capture ACL for {table_path}: {e}") + return default + + def _table_exists(self, table_path: str) -> bool: + """Check if table exists""" + try: + self.driver.scheme_client.describe_path(table_path) + return True + except Exception: + return False + + def _copy_table(self, from_name: str, to_name: str): + full_from = f"/Root/{from_name}" + full_to = f"/Root/{to_name}" + + def run_cli(args): + cmd = [ + backup_bin(), + "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}", + "--database", self.root_dir, + ] + args + return yatest.common.execute(cmd, check_exit_code=False) + + def to_rel(p): + if p.startswith(self.root_dir + "/"): + return p[len(self.root_dir) + 1:] + if p == self.root_dir: + return "" + return p.lstrip("/") + + src_rel = to_rel(full_from) + dst_rel = to_rel(full_to) + + # Create parent directory if needed + parent = os.path.dirname(dst_rel) + parent_full = os.path.join(self.root_dir, parent) if parent else None + if parent and parent_full: + mkdir_res = run_cli(["scheme", "mkdir", parent]) + if mkdir_res.exit_code != 0: + stdout, stderr = self._decode_output(mkdir_res) + logger.debug("scheme mkdir parent returned code=%s stdout=%s stderr=%s", + mkdir_res.exit_code, stdout, stderr) + + # Perform copy + item_arg = f"destination={dst_rel},source={src_rel}" + res = run_cli(["tools", "copy", "--item", item_arg]) + if res.exit_code != 0: + stdout, stderr = self._decode_output(res) + raise AssertionError(f"tools copy failed: from={full_from} to={full_to} code={res.exit_code} STDOUT: {stdout} STDERR: {stderr}") + + # Create temporary directory for additional operations + tmp_dir_rel = f"tmp_copy_{uuid.uuid4().hex[:8]}" + tmp_full = os.path.join(self.root_dir, tmp_dir_rel) + + try: + # Create temp dir + res_mk = run_cli(["scheme", "mkdir", tmp_dir_rel]) + if res_mk.exit_code != 0: + stdout, stderr = self._decode_output(res_mk) + logger.debug("tmp dir mkdir returned code=%s stdout=%s stderr=%s", res_mk.exit_code, stdout, stderr) + + # Copy to temporary dir for verification + basename = os.path.basename(dst_rel) or to_rel(full_from) + item_tmp = f"destination={tmp_dir_rel}/{basename},source={src_rel}" + res_tmp = run_cli(["tools", "copy", "--item", item_tmp]) + if res_tmp.exit_code != 0: + stdout, stderr = self._decode_output(res_tmp) + logger.warning("tools copy to tmp dir failed: code=%s stdout=%s stderr=%s", res_tmp.exit_code, stdout, stderr) + else: + # List temporary dir contents for visibility + try: + desc = self.driver.scheme_client.list_directory(tmp_full) + names = [c.name for c in desc.children if not is_system_object(c)] + logger.info("Temporary directory %s contents: %s", tmp_full, names) + except Exception as e: + logger.info("Failed to list tmp dir %s: %s", tmp_full, e) + + finally: + # Always cleanup temp directory + rmdir_res = run_cli(["scheme", "rmdir", "-rf", tmp_dir_rel]) + if rmdir_res.exit_code != 0: + stdout, stderr = self._decode_output(rmdir_res) + logger.warning("Failed to cleanup tmp dir %s: code=%s stdout=%s stderr=%s", tmp_dir_rel, rmdir_res.exit_code, stdout, stderr) + + # drop old + # try: + # with self.session_scope() as session: + # session.execute_scheme(f"DROP TABLE `{from_name}`;") + # except Exception: + # raise AssertionError(f"Failed to drop original table {full_from} after rearrange") + + def test_full_cycle_local_backup_restore_with_complex_schema_changes(self): + # Define table names and paths as constants + ORDERS_TABLE = "orders" + PRODUCTS_TABLE = "products" + ORDERS_COPY_TABLE = "orders_copy" + EXTRA_TABLE_1 = "extra_table_1" + EXTRA_TABLE_2 = "extra_table_2" + OTHER_PLACE_TABLE = "other_place_topic" + + ORDERS_PATH = f"/Root/{ORDERS_TABLE}" + PRODUCTS_PATH = f"/Root/{PRODUCTS_TABLE}" + ORDERS_COPY_PATH = f"/Root/{ORDERS_COPY_TABLE}" + EXTRA_TABLE_1_PATH = f"/Root/{EXTRA_TABLE_1}" + EXTRA_TABLE_2_PATH = f"/Root/{EXTRA_TABLE_2}" + + # Preparations: create source collection and initial tables + collection_src = f"coll_src_{uuid.uuid4().hex[:8]}" + + with self.session_scope() as session: + create_table_with_data(session, ORDERS_TABLE) + create_table_with_data(session, PRODUCTS_TABLE) + + # Create backup collection referencing the tables + self._create_backup_collection(collection_src, [ORDERS_TABLE, PRODUCTS_TABLE]) + + # === Stage 1: add/remove data, change ACLs, add more tables === + with self.session_scope() as session: + # Data modifications + session.transaction().execute( + f'PRAGMA TablePathPrefix("/Root"); UPSERT INTO {ORDERS_TABLE} (id, number, txt) VALUES (10, 100, "one-stage");', + commit_tx=True + ) + session.transaction().execute( + f'PRAGMA TablePathPrefix("/Root"); DELETE FROM {PRODUCTS_TABLE} WHERE id = 1;', + commit_tx=True + ) + + # Change ACLs + desc_for_acl = self.driver.scheme_client.describe_path(ORDERS_PATH) + owner_role = getattr(desc_for_acl, "owner", None) or "root@builtin" + role_candidates = ["public", owner_role] + acl_applied = False + for r in role_candidates: + cmd = f"GRANT SELECT ON `{ORDERS_PATH}` TO `{r.replace('`', '')}`;" + res = self._execute_yql(cmd) + if res.exit_code == 0: + acl_applied = True + break + assert acl_applied, "Failed to apply any GRANT variant in stage 1" + + # Add extra table + create_table_with_data(session, EXTRA_TABLE_1) + + # Capture state after stage 1 + assert self._table_exists(ORDERS_PATH), f"{ORDERS_TABLE} should exist after stage 1" + assert self._table_exists(PRODUCTS_PATH), f"{PRODUCTS_TABLE} should exist after stage 1" + assert self._table_exists(EXTRA_TABLE_1_PATH), f"{EXTRA_TABLE_1} should exist after stage 1" + + snapshot_stage1_t1 = self._capture_snapshot(ORDERS_TABLE) + snapshot_stage1_t2 = self._capture_snapshot(PRODUCTS_TABLE) + schema_stage1_t1 = self._capture_schema(ORDERS_PATH) + schema_stage1_t2 = self._capture_schema(PRODUCTS_PATH) + acl_stage1_t1 = self._capture_acl(ORDERS_PATH) + acl_stage1_t2 = self._capture_acl(PRODUCTS_PATH) + + # Create full backup 1 + self._backup_now(collection_src) + self.wait_for_collection_has_snapshot(collection_src, timeout_s=30) + + # === Stage 2: add/remove data, add more tables, remove tables, alter schema === + with self.session_scope() as session: + # Data changes + session.transaction().execute( + f'PRAGMA TablePathPrefix("/Root"); UPSERT INTO {ORDERS_TABLE} (id, number, txt) VALUES (11, 111, "two-stage");', + commit_tx=True + ) + session.transaction().execute( + f'PRAGMA TablePathPrefix("/Root"); DELETE FROM {ORDERS_TABLE} WHERE id = 2;', + commit_tx=True + ) + + # Add extra table 2 + create_table_with_data(session, EXTRA_TABLE_2) + + # Remove extra_table_1 + session.execute_scheme(f'DROP TABLE `{EXTRA_TABLE_1_PATH}`;') + + # Add columns to initial tables + session.execute_scheme(f'ALTER TABLE `{ORDERS_PATH}` ADD COLUMN new_col Uint32;') + + # Try to drop a column (may not be supported) + try: + session.execute_scheme(f'ALTER TABLE `{ORDERS_PATH}` DROP COLUMN number;') + except Exception: + logger.info("DROP COLUMN number failed or unsupported — continuing") + + # Change ACLs again + desc_for_acl2 = self.driver.scheme_client.describe_path(ORDERS_PATH) + owner_role2 = getattr(desc_for_acl2, "owner", None) or "root@builtin" + cmd = f"GRANT SELECT ON `{ORDERS_PATH}` TO `{owner_role2.replace('`', '')}`;" + res = self._execute_yql(cmd) + assert res.exit_code == 0, "Failed to apply GRANT in stage 2" + + # Copy table to new location + self._copy_table(ORDERS_TABLE, ORDERS_COPY_TABLE) + + # Create another table + create_table_with_data(session, OTHER_PLACE_TABLE) + + # Verify state after stage 2 + # assert self._table_exists(ORDERS_PATH), f"{ORDERS_TABLE} should exist after stage 2" + assert self._table_exists(ORDERS_COPY_PATH), f"{ORDERS_COPY_TABLE} should exist after copy" + assert self._table_exists(PRODUCTS_PATH), f"{PRODUCTS_TABLE} should exist after stage 2" + assert not self._table_exists(EXTRA_TABLE_1_PATH), f"{EXTRA_TABLE_1} should be dropped" + assert self._table_exists(EXTRA_TABLE_2_PATH), f"{EXTRA_TABLE_2} should exist after stage 2" + + # Capture state after stage 2 + snapshot_stage2_t1 = self._capture_snapshot(ORDERS_COPY_TABLE) + snapshot_stage2_t2 = self._capture_snapshot(PRODUCTS_TABLE) + schema_stage2_t2 = self._capture_schema(PRODUCTS_PATH) + acl_stage2_t1 = self._capture_acl(ORDERS_COPY_PATH) + acl_stage2_t2 = self._capture_acl(PRODUCTS_PATH) + + # Create full backup 2 + self._backup_now(collection_src) + self.wait_for_collection_has_snapshot(collection_src, timeout_s=30) + + # Export backups for verification + export_dir, exported_items = self._export_backups(collection_src) + assert len(exported_items) >= 2, "Expected at least 2 exported snapshots for verification" + + # Create restore collections and import exported snapshots + coll_restore_1 = f"coll_restore_v1_{uuid.uuid4().hex[:8]}" + coll_restore_2 = f"coll_restore_v2_{uuid.uuid4().hex[:8]}" + self._create_backup_collection(coll_restore_1, [ORDERS_TABLE, PRODUCTS_TABLE]) + self._create_backup_collection(coll_restore_2, [ORDERS_TABLE, PRODUCTS_TABLE]) + + self._restore_import(export_dir, exported_items[0], coll_restore_1) + self._restore_import(export_dir, exported_items[1], coll_restore_2) + + # Try RESTORE when tables exist -> expect fail + res_restore_when_exists = self._execute_yql(f"RESTORE `{coll_restore_2}`;") + assert res_restore_when_exists.exit_code != 0, "Expected RESTORE to fail when target tables already exist" + + # Remove all tables + tables_to_drop = [ORDERS_TABLE, PRODUCTS_TABLE, ORDERS_COPY_TABLE, EXTRA_TABLE_2, OTHER_PLACE_TABLE] + self._drop_tables(tables_to_drop) + + # === Restore backup 1 and verify === + res_restore1 = self._execute_yql(f"RESTORE `{coll_restore_1}`;") + stdout1, stderr1 = self._decode_output(res_restore1) + assert res_restore1.exit_code == 0, f"RESTORE v1 failed: stdout={stdout1} stderr={stderr1}" + + # Verify data/schema/acl for backup1 + self._verify_restored_table_data(ORDERS_TABLE, snapshot_stage1_t1) + self._verify_restored_table_data(PRODUCTS_TABLE, snapshot_stage1_t2) + + restored_schema_t1 = self._capture_schema(ORDERS_PATH) + restored_schema_t2 = self._capture_schema(PRODUCTS_PATH) + assert restored_schema_t1 == schema_stage1_t1, f"Schema for {ORDERS_TABLE} after restore v1 differs" + assert restored_schema_t2 == schema_stage1_t2, f"Schema for {PRODUCTS_TABLE} after restore v1 differs" + + restored_acl_t1 = self._capture_acl(ORDERS_PATH) + restored_acl_t2 = self._capture_acl(PRODUCTS_PATH) + if 'show_grants' in (acl_stage1_t1 or {}): + assert 'show_grants' in (restored_acl_t1 or {}) and acl_stage1_t1['show_grants'] in restored_acl_t1['show_grants'] + if 'show_grants' in (acl_stage1_t2 or {}): + assert 'show_grants' in (restored_acl_t2 or {}) and acl_stage1_t2['show_grants'] in restored_acl_t2['show_grants'] + + # Remove all tables again + self._drop_tables([ORDERS_TABLE, PRODUCTS_TABLE]) + + # === Restore backup 2 and verify === + res_restore2 = self._execute_yql(f"RESTORE `{coll_restore_2}`;") + stdout2, stderr2 = self._decode_output(res_restore2) + assert res_restore2.exit_code == 0, f"RESTORE v2 failed: stdout={stdout2} stderr={stderr2}" + + # Verify data/schema/acl for backup2 + # Check if copied table exists + if self._table_exists(ORDERS_PATH): + self._verify_restored_table_data(ORDERS_TABLE, snapshot_stage2_t1) + elif self._table_exists(ORDERS_COPY_PATH): + self._verify_restored_table_data(ORDERS_COPY_TABLE, snapshot_stage2_t1) + else: + raise AssertionError("Neither orders nor orders_copy table exists after restore v2") + + self._verify_restored_table_data(PRODUCTS_TABLE, snapshot_stage2_t2) + + # Verify schema for v2 + if self._table_exists(ORDERS_PATH): + restored_schema2_t1 = self._capture_schema(ORDERS_PATH) + else: + restored_schema2_t1 = self._capture_schema(ORDERS_COPY_PATH) + + restored_schema2_t2 = self._capture_schema(PRODUCTS_PATH) + + # Schema may differ due to ALTER operations, so just verify presence + assert restored_schema2_t1 is not None, "Should have schema for orders table after restore v2" + assert restored_schema2_t2 == schema_stage2_t2, f"Schema for {PRODUCTS_TABLE} after restore v2 differs" + + # Verify ACL for v2 + if self._table_exists(ORDERS_PATH): + restored_acl2_t1 = self._capture_acl(ORDERS_PATH) + else: + restored_acl2_t1 = self._capture_acl(ORDERS_COPY_PATH) + + restored_acl2_t2 = self._capture_acl(PRODUCTS_PATH) + + if 'show_grants' in (acl_stage2_t1 or {}): + assert 'show_grants' in (restored_acl2_t1 or {}) and acl_stage2_t1['show_grants'] in restored_acl2_t1['show_grants'] + if 'show_grants' in (acl_stage2_t2 or {}): + assert 'show_grants' in (restored_acl2_t2 or {}) and acl_stage2_t2['show_grants'] in restored_acl2_t2['show_grants'] + + # Cleanup + if os.path.exists(export_dir): + shutil.rmtree(export_dir) |
