summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg <[email protected]>2025-10-24 05:08:48 +0300
committerGitHub <[email protected]>2025-10-24 05:08:48 +0300
commit4d080588d79a43a26a7c1bd427cfa3773841f804 (patch)
tree3da521cf1655c489ac70644d565206dd4e66ea5f
parent564cf7cb2255a107b4f44c18b2a1844041f20b4d (diff)
Full restore test with complex schema change (#27202)
-rw-r--r--ydb/tests/functional/backup_collection/basic_user_scenarios.py427
1 files changed, 427 insertions, 0 deletions
diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py
index 5f6a3b95a3b..4d0dec965f3 100644
--- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py
+++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py
@@ -7,6 +7,7 @@ import yatest
import pytest
import tempfile
import re
+import uuid
from typing import List
from ydb.tests.library.harness.kikimr_runner import KiKiMR
@@ -577,6 +578,95 @@ class BaseTestBackupInFiles(object):
"last_success": last_success,
}
+ def _rearrange_table(self, from_name: str, to_name: str):
+ full_from = f"/Root/{from_name}"
+ full_to = f"/Root/{to_name}"
+
+ def run_cli(args):
+ cmd = [
+ backup_bin(),
+ "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}",
+ "--database", self.root_dir,
+ ] + args
+ return yatest.common.execute(cmd, check_exit_code=False)
+
+ def to_rel(p):
+ if p.startswith(self.root_dir + "/"):
+ return p[len(self.root_dir) + 1 :]
+ if p == self.root_dir:
+ return ""
+ return p.lstrip("/")
+
+ src_rel = to_rel(full_from)
+ dst_rel = to_rel(full_to)
+
+ parent = os.path.dirname(dst_rel)
+ parent_full = os.path.join(self.root_dir, parent) if parent else None
+ if parent and parent_full:
+ self.driver.scheme_client.list_directory(parent_full)
+
+ mkdir_res = run_cli(["scheme", "mkdir", parent])
+ if mkdir_res.exit_code != 0:
+ logger.debug("scheme mkdir parent returned code=%s stdout=%s stderr=%s",
+ mkdir_res.exit_code,
+ getattr(mkdir_res, "std_out", b"").decode("utf-8", "ignore"),
+ getattr(mkdir_res, "std_err", b"").decode("utf-8", "ignore"))
+
+ # perform tools copy via CLI to the requested destination
+ item_arg = f"destination={dst_rel},source={src_rel}"
+ res = run_cli(["tools", "copy", "--item", item_arg])
+ if res.exit_code != 0:
+ out = (res.std_out or b"").decode("utf-8", "ignore")
+ err = (res.std_err or b"").decode("utf-8", "ignore")
+ raise AssertionError(f"tools copy failed: from={full_from} to={full_to} code={res.exit_code} STDOUT: {out} STDERR: {err}")
+
+ tmp_dir_rel = f"tmp_rearr_{int(time.time())}"
+ tmp_full = os.path.join(self.root_dir, tmp_dir_rel)
+ tmp_created = False
+ try:
+ # create temporary directory
+ try:
+ self.driver.scheme_client.list_directory(tmp_full)
+ tmp_created = False
+ except Exception:
+ res_mk = run_cli(["scheme", "mkdir", tmp_dir_rel])
+ if res_mk.exit_code != 0:
+ logger.debug("tmp dir mkdir returned code=%s stdout=%s stderr=%s",
+ res_mk.exit_code,
+ getattr(res_mk, "std_out", b"").decode("utf-8", "ignore"),
+ getattr(res_mk, "std_err", b"").decode("utf-8", "ignore"))
+ else:
+ tmp_created = True
+
+ # copy to temporary dir (basename of destination)
+ basename = os.path.basename(dst_rel) or to_rel(full_from)
+ item_tmp = f"destination={tmp_dir_rel}/{basename},source={src_rel}"
+ res_tmp = run_cli(["tools", "copy", "--item", item_tmp])
+ if res_tmp.exit_code != 0:
+ logger.warning("tools copy to tmp dir failed: code=%s stdout=%s stderr=%s",
+ res_tmp.exit_code,
+ getattr(res_tmp, "std_out", b"").decode("utf-8", "ignore"),
+ getattr(res_tmp, "std_err", b"").decode("utf-8", "ignore"))
+ else:
+ # list temporary dir contents via scheme_client for visibility
+ try:
+ desc = self.driver.scheme_client.list_directory(tmp_full)
+ names = [c.name for c in desc.children if not is_system_object(c)]
+ logger.info("Temporary directory %s contents: %s", tmp_full, names)
+ except Exception as e:
+ logger.info("Failed to list tmp dir %s: %s", tmp_full, e)
+
+ finally:
+ # remove temporary directory if we created it here
+ if tmp_created:
+ rmdir_res = run_cli(["scheme", "rmdir", "-rf", tmp_dir_rel])
+ if rmdir_res.exit_code != 0:
+ logger.warning("Failed to rmdir tmp dir %s: code=%s stdout=%s stderr=%s",
+ tmp_dir_rel,
+ rmdir_res.exit_code,
+ getattr(rmdir_res, "std_out", b"").decode("utf-8", "ignore"),
+ getattr(rmdir_res, "std_err", b"").decode("utf-8", "ignore"))
+
class TestFullCycleLocalBackupRestore(BaseTestBackupInFiles):
def _execute_yql(self, script, verbose=False):
@@ -1683,3 +1773,340 @@ class TestIncrementalChainRestoreAfterDeletion(TestFullCycleLocalBackupRestore):
if os.path.exists(export_dir):
shutil.rmtree(export_dir)
+
+
+class TestFullCycleLocalBackupRestoreWComplSchemaChange(TestFullCycleLocalBackupRestoreWSchemaChange):
+ def _decode_output(self, result) -> tuple:
+ """Helper to decode stdout and stderr from execution result"""
+ stdout = getattr(result, "std_out", b"").decode("utf-8", "ignore")
+ stderr = getattr(result, "std_err", b"").decode("utf-8", "ignore")
+ return stdout, stderr
+
+ def _safe_capture_snapshot(self, table: str, default=None):
+ """Safely capture table snapshot, returning default on failure"""
+ try:
+ return self._capture_snapshot(table)
+ except Exception as e:
+ logger.debug(f"Failed to capture snapshot for {table}: {e}")
+ return default
+
+ def _safe_capture_schema(self, table_path: str, default=None):
+ """Safely capture table schema, returning default on failure"""
+ try:
+ return self._capture_schema(table_path)
+ except Exception as e:
+ logger.debug(f"Failed to capture schema for {table_path}: {e}")
+ return default
+
+ def _safe_capture_acl(self, table_path: str, default=None):
+ """Safely capture table ACL, returning default on failure"""
+ try:
+ return self._capture_acl(table_path)
+ except Exception as e:
+ logger.debug(f"Failed to capture ACL for {table_path}: {e}")
+ return default
+
+ def _table_exists(self, table_path: str) -> bool:
+ """Check if table exists"""
+ try:
+ self.driver.scheme_client.describe_path(table_path)
+ return True
+ except Exception:
+ return False
+
+ def _copy_table(self, from_name: str, to_name: str):
+ full_from = f"/Root/{from_name}"
+ full_to = f"/Root/{to_name}"
+
+ def run_cli(args):
+ cmd = [
+ backup_bin(),
+ "--endpoint", f"grpc://localhost:{self.cluster.nodes[1].grpc_port}",
+ "--database", self.root_dir,
+ ] + args
+ return yatest.common.execute(cmd, check_exit_code=False)
+
+ def to_rel(p):
+ if p.startswith(self.root_dir + "/"):
+ return p[len(self.root_dir) + 1:]
+ if p == self.root_dir:
+ return ""
+ return p.lstrip("/")
+
+ src_rel = to_rel(full_from)
+ dst_rel = to_rel(full_to)
+
+ # Create parent directory if needed
+ parent = os.path.dirname(dst_rel)
+ parent_full = os.path.join(self.root_dir, parent) if parent else None
+ if parent and parent_full:
+ mkdir_res = run_cli(["scheme", "mkdir", parent])
+ if mkdir_res.exit_code != 0:
+ stdout, stderr = self._decode_output(mkdir_res)
+ logger.debug("scheme mkdir parent returned code=%s stdout=%s stderr=%s",
+ mkdir_res.exit_code, stdout, stderr)
+
+ # Perform copy
+ item_arg = f"destination={dst_rel},source={src_rel}"
+ res = run_cli(["tools", "copy", "--item", item_arg])
+ if res.exit_code != 0:
+ stdout, stderr = self._decode_output(res)
+ raise AssertionError(f"tools copy failed: from={full_from} to={full_to} code={res.exit_code} STDOUT: {stdout} STDERR: {stderr}")
+
+ # Create temporary directory for additional operations
+ tmp_dir_rel = f"tmp_copy_{uuid.uuid4().hex[:8]}"
+ tmp_full = os.path.join(self.root_dir, tmp_dir_rel)
+
+ try:
+ # Create temp dir
+ res_mk = run_cli(["scheme", "mkdir", tmp_dir_rel])
+ if res_mk.exit_code != 0:
+ stdout, stderr = self._decode_output(res_mk)
+ logger.debug("tmp dir mkdir returned code=%s stdout=%s stderr=%s", res_mk.exit_code, stdout, stderr)
+
+ # Copy to temporary dir for verification
+ basename = os.path.basename(dst_rel) or to_rel(full_from)
+ item_tmp = f"destination={tmp_dir_rel}/{basename},source={src_rel}"
+ res_tmp = run_cli(["tools", "copy", "--item", item_tmp])
+ if res_tmp.exit_code != 0:
+ stdout, stderr = self._decode_output(res_tmp)
+ logger.warning("tools copy to tmp dir failed: code=%s stdout=%s stderr=%s", res_tmp.exit_code, stdout, stderr)
+ else:
+ # List temporary dir contents for visibility
+ try:
+ desc = self.driver.scheme_client.list_directory(tmp_full)
+ names = [c.name for c in desc.children if not is_system_object(c)]
+ logger.info("Temporary directory %s contents: %s", tmp_full, names)
+ except Exception as e:
+ logger.info("Failed to list tmp dir %s: %s", tmp_full, e)
+
+ finally:
+ # Always cleanup temp directory
+ rmdir_res = run_cli(["scheme", "rmdir", "-rf", tmp_dir_rel])
+ if rmdir_res.exit_code != 0:
+ stdout, stderr = self._decode_output(rmdir_res)
+ logger.warning("Failed to cleanup tmp dir %s: code=%s stdout=%s stderr=%s", tmp_dir_rel, rmdir_res.exit_code, stdout, stderr)
+
+ # drop old
+ # try:
+ # with self.session_scope() as session:
+ # session.execute_scheme(f"DROP TABLE `{from_name}`;")
+ # except Exception:
+ # raise AssertionError(f"Failed to drop original table {full_from} after rearrange")
+
+ def test_full_cycle_local_backup_restore_with_complex_schema_changes(self):
+ # Define table names and paths as constants
+ ORDERS_TABLE = "orders"
+ PRODUCTS_TABLE = "products"
+ ORDERS_COPY_TABLE = "orders_copy"
+ EXTRA_TABLE_1 = "extra_table_1"
+ EXTRA_TABLE_2 = "extra_table_2"
+ OTHER_PLACE_TABLE = "other_place_topic"
+
+ ORDERS_PATH = f"/Root/{ORDERS_TABLE}"
+ PRODUCTS_PATH = f"/Root/{PRODUCTS_TABLE}"
+ ORDERS_COPY_PATH = f"/Root/{ORDERS_COPY_TABLE}"
+ EXTRA_TABLE_1_PATH = f"/Root/{EXTRA_TABLE_1}"
+ EXTRA_TABLE_2_PATH = f"/Root/{EXTRA_TABLE_2}"
+
+ # Preparations: create source collection and initial tables
+ collection_src = f"coll_src_{uuid.uuid4().hex[:8]}"
+
+ with self.session_scope() as session:
+ create_table_with_data(session, ORDERS_TABLE)
+ create_table_with_data(session, PRODUCTS_TABLE)
+
+ # Create backup collection referencing the tables
+ self._create_backup_collection(collection_src, [ORDERS_TABLE, PRODUCTS_TABLE])
+
+ # === Stage 1: add/remove data, change ACLs, add more tables ===
+ with self.session_scope() as session:
+ # Data modifications
+ session.transaction().execute(
+ f'PRAGMA TablePathPrefix("/Root"); UPSERT INTO {ORDERS_TABLE} (id, number, txt) VALUES (10, 100, "one-stage");',
+ commit_tx=True
+ )
+ session.transaction().execute(
+ f'PRAGMA TablePathPrefix("/Root"); DELETE FROM {PRODUCTS_TABLE} WHERE id = 1;',
+ commit_tx=True
+ )
+
+ # Change ACLs
+ desc_for_acl = self.driver.scheme_client.describe_path(ORDERS_PATH)
+ owner_role = getattr(desc_for_acl, "owner", None) or "root@builtin"
+ role_candidates = ["public", owner_role]
+ acl_applied = False
+ for r in role_candidates:
+ cmd = f"GRANT SELECT ON `{ORDERS_PATH}` TO `{r.replace('`', '')}`;"
+ res = self._execute_yql(cmd)
+ if res.exit_code == 0:
+ acl_applied = True
+ break
+ assert acl_applied, "Failed to apply any GRANT variant in stage 1"
+
+ # Add extra table
+ create_table_with_data(session, EXTRA_TABLE_1)
+
+ # Capture state after stage 1
+ assert self._table_exists(ORDERS_PATH), f"{ORDERS_TABLE} should exist after stage 1"
+ assert self._table_exists(PRODUCTS_PATH), f"{PRODUCTS_TABLE} should exist after stage 1"
+ assert self._table_exists(EXTRA_TABLE_1_PATH), f"{EXTRA_TABLE_1} should exist after stage 1"
+
+ snapshot_stage1_t1 = self._capture_snapshot(ORDERS_TABLE)
+ snapshot_stage1_t2 = self._capture_snapshot(PRODUCTS_TABLE)
+ schema_stage1_t1 = self._capture_schema(ORDERS_PATH)
+ schema_stage1_t2 = self._capture_schema(PRODUCTS_PATH)
+ acl_stage1_t1 = self._capture_acl(ORDERS_PATH)
+ acl_stage1_t2 = self._capture_acl(PRODUCTS_PATH)
+
+ # Create full backup 1
+ self._backup_now(collection_src)
+ self.wait_for_collection_has_snapshot(collection_src, timeout_s=30)
+
+ # === Stage 2: add/remove data, add more tables, remove tables, alter schema ===
+ with self.session_scope() as session:
+ # Data changes
+ session.transaction().execute(
+ f'PRAGMA TablePathPrefix("/Root"); UPSERT INTO {ORDERS_TABLE} (id, number, txt) VALUES (11, 111, "two-stage");',
+ commit_tx=True
+ )
+ session.transaction().execute(
+ f'PRAGMA TablePathPrefix("/Root"); DELETE FROM {ORDERS_TABLE} WHERE id = 2;',
+ commit_tx=True
+ )
+
+ # Add extra table 2
+ create_table_with_data(session, EXTRA_TABLE_2)
+
+ # Remove extra_table_1
+ session.execute_scheme(f'DROP TABLE `{EXTRA_TABLE_1_PATH}`;')
+
+ # Add columns to initial tables
+ session.execute_scheme(f'ALTER TABLE `{ORDERS_PATH}` ADD COLUMN new_col Uint32;')
+
+ # Try to drop a column (may not be supported)
+ try:
+ session.execute_scheme(f'ALTER TABLE `{ORDERS_PATH}` DROP COLUMN number;')
+ except Exception:
+ logger.info("DROP COLUMN number failed or unsupported — continuing")
+
+ # Change ACLs again
+ desc_for_acl2 = self.driver.scheme_client.describe_path(ORDERS_PATH)
+ owner_role2 = getattr(desc_for_acl2, "owner", None) or "root@builtin"
+ cmd = f"GRANT SELECT ON `{ORDERS_PATH}` TO `{owner_role2.replace('`', '')}`;"
+ res = self._execute_yql(cmd)
+ assert res.exit_code == 0, "Failed to apply GRANT in stage 2"
+
+ # Copy table to new location
+ self._copy_table(ORDERS_TABLE, ORDERS_COPY_TABLE)
+
+ # Create another table
+ create_table_with_data(session, OTHER_PLACE_TABLE)
+
+ # Verify state after stage 2
+ # assert self._table_exists(ORDERS_PATH), f"{ORDERS_TABLE} should exist after stage 2"
+ assert self._table_exists(ORDERS_COPY_PATH), f"{ORDERS_COPY_TABLE} should exist after copy"
+ assert self._table_exists(PRODUCTS_PATH), f"{PRODUCTS_TABLE} should exist after stage 2"
+ assert not self._table_exists(EXTRA_TABLE_1_PATH), f"{EXTRA_TABLE_1} should be dropped"
+ assert self._table_exists(EXTRA_TABLE_2_PATH), f"{EXTRA_TABLE_2} should exist after stage 2"
+
+ # Capture state after stage 2
+ snapshot_stage2_t1 = self._capture_snapshot(ORDERS_COPY_TABLE)
+ snapshot_stage2_t2 = self._capture_snapshot(PRODUCTS_TABLE)
+ schema_stage2_t2 = self._capture_schema(PRODUCTS_PATH)
+ acl_stage2_t1 = self._capture_acl(ORDERS_COPY_PATH)
+ acl_stage2_t2 = self._capture_acl(PRODUCTS_PATH)
+
+ # Create full backup 2
+ self._backup_now(collection_src)
+ self.wait_for_collection_has_snapshot(collection_src, timeout_s=30)
+
+ # Export backups for verification
+ export_dir, exported_items = self._export_backups(collection_src)
+ assert len(exported_items) >= 2, "Expected at least 2 exported snapshots for verification"
+
+ # Create restore collections and import exported snapshots
+ coll_restore_1 = f"coll_restore_v1_{uuid.uuid4().hex[:8]}"
+ coll_restore_2 = f"coll_restore_v2_{uuid.uuid4().hex[:8]}"
+ self._create_backup_collection(coll_restore_1, [ORDERS_TABLE, PRODUCTS_TABLE])
+ self._create_backup_collection(coll_restore_2, [ORDERS_TABLE, PRODUCTS_TABLE])
+
+ self._restore_import(export_dir, exported_items[0], coll_restore_1)
+ self._restore_import(export_dir, exported_items[1], coll_restore_2)
+
+ # Try RESTORE when tables exist -> expect fail
+ res_restore_when_exists = self._execute_yql(f"RESTORE `{coll_restore_2}`;")
+ assert res_restore_when_exists.exit_code != 0, "Expected RESTORE to fail when target tables already exist"
+
+ # Remove all tables
+ tables_to_drop = [ORDERS_TABLE, PRODUCTS_TABLE, ORDERS_COPY_TABLE, EXTRA_TABLE_2, OTHER_PLACE_TABLE]
+ self._drop_tables(tables_to_drop)
+
+ # === Restore backup 1 and verify ===
+ res_restore1 = self._execute_yql(f"RESTORE `{coll_restore_1}`;")
+ stdout1, stderr1 = self._decode_output(res_restore1)
+ assert res_restore1.exit_code == 0, f"RESTORE v1 failed: stdout={stdout1} stderr={stderr1}"
+
+ # Verify data/schema/acl for backup1
+ self._verify_restored_table_data(ORDERS_TABLE, snapshot_stage1_t1)
+ self._verify_restored_table_data(PRODUCTS_TABLE, snapshot_stage1_t2)
+
+ restored_schema_t1 = self._capture_schema(ORDERS_PATH)
+ restored_schema_t2 = self._capture_schema(PRODUCTS_PATH)
+ assert restored_schema_t1 == schema_stage1_t1, f"Schema for {ORDERS_TABLE} after restore v1 differs"
+ assert restored_schema_t2 == schema_stage1_t2, f"Schema for {PRODUCTS_TABLE} after restore v1 differs"
+
+ restored_acl_t1 = self._capture_acl(ORDERS_PATH)
+ restored_acl_t2 = self._capture_acl(PRODUCTS_PATH)
+ if 'show_grants' in (acl_stage1_t1 or {}):
+ assert 'show_grants' in (restored_acl_t1 or {}) and acl_stage1_t1['show_grants'] in restored_acl_t1['show_grants']
+ if 'show_grants' in (acl_stage1_t2 or {}):
+ assert 'show_grants' in (restored_acl_t2 or {}) and acl_stage1_t2['show_grants'] in restored_acl_t2['show_grants']
+
+ # Remove all tables again
+ self._drop_tables([ORDERS_TABLE, PRODUCTS_TABLE])
+
+ # === Restore backup 2 and verify ===
+ res_restore2 = self._execute_yql(f"RESTORE `{coll_restore_2}`;")
+ stdout2, stderr2 = self._decode_output(res_restore2)
+ assert res_restore2.exit_code == 0, f"RESTORE v2 failed: stdout={stdout2} stderr={stderr2}"
+
+ # Verify data/schema/acl for backup2
+ # Check if copied table exists
+ if self._table_exists(ORDERS_PATH):
+ self._verify_restored_table_data(ORDERS_TABLE, snapshot_stage2_t1)
+ elif self._table_exists(ORDERS_COPY_PATH):
+ self._verify_restored_table_data(ORDERS_COPY_TABLE, snapshot_stage2_t1)
+ else:
+ raise AssertionError("Neither orders nor orders_copy table exists after restore v2")
+
+ self._verify_restored_table_data(PRODUCTS_TABLE, snapshot_stage2_t2)
+
+ # Verify schema for v2
+ if self._table_exists(ORDERS_PATH):
+ restored_schema2_t1 = self._capture_schema(ORDERS_PATH)
+ else:
+ restored_schema2_t1 = self._capture_schema(ORDERS_COPY_PATH)
+
+ restored_schema2_t2 = self._capture_schema(PRODUCTS_PATH)
+
+ # Schema may differ due to ALTER operations, so just verify presence
+ assert restored_schema2_t1 is not None, "Should have schema for orders table after restore v2"
+ assert restored_schema2_t2 == schema_stage2_t2, f"Schema for {PRODUCTS_TABLE} after restore v2 differs"
+
+ # Verify ACL for v2
+ if self._table_exists(ORDERS_PATH):
+ restored_acl2_t1 = self._capture_acl(ORDERS_PATH)
+ else:
+ restored_acl2_t1 = self._capture_acl(ORDERS_COPY_PATH)
+
+ restored_acl2_t2 = self._capture_acl(PRODUCTS_PATH)
+
+ if 'show_grants' in (acl_stage2_t1 or {}):
+ assert 'show_grants' in (restored_acl2_t1 or {}) and acl_stage2_t1['show_grants'] in restored_acl2_t1['show_grants']
+ if 'show_grants' in (acl_stage2_t2 or {}):
+ assert 'show_grants' in (restored_acl2_t2 or {}) and acl_stage2_t2['show_grants'] in restored_acl2_t2['show_grants']
+
+ # Cleanup
+ if os.path.exists(export_dir):
+ shutil.rmtree(export_dir)